Skip to main content

msquic_async/
connection.rs

1use crate::buffer::WriteBuffer;
2use crate::stream::{ReadStream, StartError as StreamStartError, Stream, StreamType};
3
4#[cfg(feature = "msquic-2-5")]
5use msquic_v2_5 as msquic;
6#[cfg(feature = "msquic-seera")]
7use seera_msquic as msquic;
8
9use std::collections::VecDeque;
10use std::fs::File;
11use std::future::Future;
12use std::io::{Seek, SeekFrom, Write};
13use std::net::SocketAddr;
14use std::ops::Deref;
15use std::pin::Pin;
16use std::sync::{Arc, Mutex};
17use std::task::{Context, Poll, Waker};
18
19use bytes::Bytes;
20use libc::c_void;
21use msquic::ffi::QUIC_TLS_SECRETS__bindgen_ty_1;
22use thiserror::Error;
23use tracing::{info, trace};
24
25#[derive(Clone)]
26pub struct Connection(Arc<ConnectionInstance>);
27
28impl Connection {
29    /// Create a new connection.
30    ///
31    /// The connection is not started until `start` is called.
32    pub fn new(registration: &msquic::Registration) -> Result<Self, ConnectionError> {
33        let inner = Arc::new(ConnectionInner::new(ConnectionState::Open, None, None));
34        let inner_in_ev = inner.clone();
35        let msquic_conn = msquic::Connection::open(registration, move |conn_ref, ev| {
36            inner_in_ev.callback_handler_impl(conn_ref, ev)
37        })
38        .map_err(ConnectionError::OtherError)?;
39        let instance = Arc::new(ConnectionInstance { inner, msquic_conn });
40        trace!(
41            "ConnectionInstance({:p}, Inner: {:p}) Open by local",
42            instance,
43            instance.inner
44        );
45        Ok(Self(instance))
46    }
47
48    pub(crate) fn from_raw(
49        #[cfg(feature = "msquic-2-5")] handle: msquic::ffi::HQUIC,
50        #[cfg(not(feature = "msquic-2-5"))] msquic_conn: msquic::Connection,
51        tls_secrets: Option<Box<msquic::ffi::QUIC_TLS_SECRETS>>,
52        sslkeylog_file: Option<File>,
53    ) -> Self {
54        #[cfg(feature = "msquic-2-5")]
55        let msquic_conn = unsafe { msquic::Connection::from_raw(handle) };
56        let inner = Arc::new(ConnectionInner::new(
57            ConnectionState::Connected,
58            tls_secrets,
59            sslkeylog_file,
60        ));
61        let inner_in_ev = inner.clone();
62        msquic_conn.set_callback_handler(move |conn_ref, ev| {
63            inner_in_ev.callback_handler_impl(conn_ref, ev)
64        });
65        let instance = Arc::new(ConnectionInstance { inner, msquic_conn });
66        trace!(
67            "ConnectionInstance({:p}, Inner: {:p}) Open by peer",
68            instance,
69            instance.inner
70        );
71        Self(instance)
72    }
73
74    /// Start the connection.
75    pub fn start<'a>(
76        &'a self,
77        configuration: &'a msquic::Configuration,
78        host: &'a str,
79        port: u16,
80    ) -> ConnectionStart<'a> {
81        ConnectionStart {
82            conn: self,
83            configuration,
84            host,
85            port,
86        }
87    }
88
89    /// Poll to start the connection.
90    fn poll_start(
91        &self,
92        cx: &mut Context<'_>,
93        configuration: &msquic::Configuration,
94        host: &str,
95        port: u16,
96    ) -> Poll<Result<(), StartError>> {
97        let mut exclusive = self.0.exclusive.lock().unwrap();
98        match exclusive.state {
99            ConnectionState::Open => {
100                self.0
101                    .msquic_conn
102                    .start(configuration, host, port)
103                    .map_err(StartError::OtherError)?;
104                exclusive.state = ConnectionState::Connecting;
105            }
106            ConnectionState::Connecting => {}
107            ConnectionState::Connected => return Poll::Ready(Ok(())),
108            ConnectionState::Shutdown | ConnectionState::ShutdownComplete => {
109                return Poll::Ready(Err(StartError::ConnectionLost(
110                    exclusive.error.as_ref().expect("error").clone(),
111                )));
112            }
113        }
114        exclusive.start_waiters.push(cx.waker().clone());
115        Poll::Pending
116    }
117
118    /// Open a new outbound stream.
119    pub fn open_outbound_stream(
120        &self,
121        stream_type: StreamType,
122        fail_on_blocked: bool,
123    ) -> OpenOutboundStream<'_> {
124        OpenOutboundStream {
125            conn: &self.0,
126            stream_type: Some(stream_type),
127            stream: None,
128            fail_on_blocked,
129        }
130    }
131
132    /// Accept an inbound bidilectional stream.
133    pub fn accept_inbound_stream(&self) -> AcceptInboundStream<'_> {
134        AcceptInboundStream { conn: self }
135    }
136
137    /// Poll to accept an inbound bidilectional stream.
138    pub fn poll_accept_inbound_stream(
139        &self,
140        cx: &mut Context<'_>,
141    ) -> Poll<Result<Stream, StreamStartError>> {
142        let mut exclusive = self.0.exclusive.lock().unwrap();
143        match exclusive.state {
144            ConnectionState::Open => {
145                return Poll::Ready(Err(StreamStartError::ConnectionNotStarted));
146            }
147            ConnectionState::Connecting => {
148                exclusive.start_waiters.push(cx.waker().clone());
149                return Poll::Pending;
150            }
151            ConnectionState::Connected => {}
152            ConnectionState::Shutdown | ConnectionState::ShutdownComplete => {
153                return Poll::Ready(Err(StreamStartError::ConnectionLost(
154                    exclusive.error.as_ref().expect("error").clone(),
155                )));
156            }
157        }
158
159        if !exclusive.inbound_streams.is_empty() {
160            return Poll::Ready(Ok(exclusive.inbound_streams.pop_front().unwrap()));
161        }
162        exclusive.inbound_stream_waiters.push(cx.waker().clone());
163        Poll::Pending
164    }
165
166    /// Accept an inbound unidirectional stream.
167    pub fn accept_inbound_uni_stream(&self) -> AcceptInboundUniStream<'_> {
168        AcceptInboundUniStream { conn: self }
169    }
170
171    /// Poll to accept an inbound unidirectional stream.
172    pub fn poll_accept_inbound_uni_stream(
173        &self,
174        cx: &mut Context<'_>,
175    ) -> Poll<Result<ReadStream, StreamStartError>> {
176        let mut exclusive = self.0.exclusive.lock().unwrap();
177        match exclusive.state {
178            ConnectionState::Open => {
179                return Poll::Ready(Err(StreamStartError::ConnectionNotStarted));
180            }
181            ConnectionState::Connecting => {
182                exclusive.start_waiters.push(cx.waker().clone());
183                return Poll::Pending;
184            }
185            ConnectionState::Connected => {}
186            ConnectionState::Shutdown | ConnectionState::ShutdownComplete => {
187                return Poll::Ready(Err(StreamStartError::ConnectionLost(
188                    exclusive.error.as_ref().expect("error").clone(),
189                )));
190            }
191        }
192
193        if !exclusive.inbound_uni_streams.is_empty() {
194            return Poll::Ready(Ok(exclusive.inbound_uni_streams.pop_front().unwrap()));
195        }
196        exclusive
197            .inbound_uni_stream_waiters
198            .push(cx.waker().clone());
199        Poll::Pending
200    }
201
202    /// Poll to receive a datagram.
203    pub fn poll_receive_datagram(
204        &self,
205        cx: &mut Context<'_>,
206    ) -> Poll<Result<Bytes, DgramReceiveError>> {
207        let mut exclusive = self.0.exclusive.lock().unwrap();
208        match exclusive.state {
209            ConnectionState::Open => {
210                return Poll::Ready(Err(DgramReceiveError::ConnectionNotStarted));
211            }
212            ConnectionState::Connecting => {
213                exclusive.start_waiters.push(cx.waker().clone());
214                return Poll::Pending;
215            }
216            ConnectionState::Connected => {}
217            ConnectionState::Shutdown | ConnectionState::ShutdownComplete => {
218                return Poll::Ready(Err(DgramReceiveError::ConnectionLost(
219                    exclusive.error.as_ref().expect("error").clone(),
220                )));
221            }
222        }
223
224        if let Some(buf) = exclusive.recv_buffers.pop_front() {
225            Poll::Ready(Ok(buf))
226        } else {
227            exclusive.recv_waiters.push(cx.waker().clone());
228            Poll::Pending
229        }
230    }
231
232    /// Poll to send a datagram.
233    pub fn poll_send_datagram(
234        &self,
235        cx: &mut Context<'_>,
236        buf: &Bytes,
237    ) -> Poll<Result<(), DgramSendError>> {
238        let mut exclusive = self.0.exclusive.lock().unwrap();
239        match exclusive.state {
240            ConnectionState::Open => {
241                return Poll::Ready(Err(DgramSendError::ConnectionNotStarted));
242            }
243            ConnectionState::Connecting => {
244                exclusive.start_waiters.push(cx.waker().clone());
245                return Poll::Pending;
246            }
247            ConnectionState::Connected => {}
248            ConnectionState::Shutdown | ConnectionState::ShutdownComplete => {
249                return Poll::Ready(Err(DgramSendError::ConnectionLost(
250                    exclusive.error.as_ref().expect("error").clone(),
251                )));
252            }
253        }
254
255        let mut write_buf = exclusive.write_pool.pop().unwrap_or(WriteBuffer::new());
256        let _ = write_buf.put_zerocopy(buf);
257        let buffers = unsafe {
258            let (data, len) = write_buf.get_buffers();
259            std::slice::from_raw_parts(data, len)
260        };
261        let res = unsafe {
262            self.0.msquic_conn.datagram_send(
263                buffers,
264                msquic::SendFlags::NONE,
265                write_buf.into_raw() as *const _,
266            )
267        }
268        .map_err(DgramSendError::OtherError);
269        Poll::Ready(res)
270    }
271
272    /// Send a datagram.
273    pub fn send_datagram(&self, buf: &Bytes) -> Result<(), DgramSendError> {
274        let mut exclusive = self.0.exclusive.lock().unwrap();
275        match exclusive.state {
276            ConnectionState::Open => {
277                return Err(DgramSendError::ConnectionNotStarted);
278            }
279            ConnectionState::Connecting => {
280                return Err(DgramSendError::ConnectionNotStarted);
281            }
282            ConnectionState::Connected => {}
283            ConnectionState::Shutdown | ConnectionState::ShutdownComplete => {
284                return Err(DgramSendError::ConnectionLost(
285                    exclusive.error.as_ref().expect("error").clone(),
286                ));
287            }
288        }
289
290        if !exclusive.dgram_send_enabled {
291            return Err(DgramSendError::Denied);
292        }
293        if buf.len() > exclusive.dgram_max_send_length as usize {
294            return Err(DgramSendError::TooBig);
295        }
296
297        let mut write_buf = exclusive.write_pool.pop().unwrap_or(WriteBuffer::new());
298        let _ = write_buf.put_zerocopy(buf);
299        let buffers = unsafe {
300            let (data, len) = write_buf.get_buffers();
301            std::slice::from_raw_parts(data, len)
302        };
303        unsafe {
304            self.0.msquic_conn.datagram_send(
305                buffers,
306                msquic::SendFlags::NONE,
307                write_buf.into_raw() as *const _,
308            )
309        }
310        .map_err(DgramSendError::OtherError)?;
311        Ok(())
312    }
313
314    /// Poll to shutdown the connection.
315    pub fn poll_shutdown(
316        &self,
317        cx: &mut Context<'_>,
318        error_code: u64,
319    ) -> Poll<Result<(), ShutdownError>> {
320        let mut exclusive = self.0.exclusive.lock().unwrap();
321        match exclusive.state {
322            ConnectionState::Open => {
323                return Poll::Ready(Err(ShutdownError::ConnectionNotStarted));
324            }
325            ConnectionState::Connecting => {
326                exclusive.start_waiters.push(cx.waker().clone());
327                return Poll::Pending;
328            }
329            ConnectionState::Connected => {
330                self.0
331                    .msquic_conn
332                    .shutdown(msquic::ConnectionShutdownFlags::NONE, error_code);
333                exclusive.state = ConnectionState::Shutdown;
334            }
335            ConnectionState::Shutdown => {}
336            ConnectionState::ShutdownComplete => {
337                if let Some(ConnectionError::ShutdownByLocal) = &exclusive.error {
338                    return Poll::Ready(Ok(()));
339                } else {
340                    return Poll::Ready(Err(ShutdownError::ConnectionLost(
341                        exclusive.error.as_ref().expect("error").clone(),
342                    )));
343                }
344            }
345        }
346
347        exclusive.shutdown_waiters.push(cx.waker().clone());
348        Poll::Pending
349    }
350
351    /// Shutdown the connection.
352    pub fn shutdown(&self, error_code: u64) -> Result<(), ShutdownError> {
353        let mut exclusive = self.0.exclusive.lock().unwrap();
354        match exclusive.state {
355            ConnectionState::Open | ConnectionState::Connecting => {
356                return Err(ShutdownError::ConnectionNotStarted);
357            }
358            ConnectionState::Connected => {
359                self.0
360                    .msquic_conn
361                    .shutdown(msquic::ConnectionShutdownFlags::NONE, error_code);
362                exclusive.state = ConnectionState::Shutdown;
363            }
364            _ => {}
365        }
366        Ok(())
367    }
368
369    /// Get the local address of the connection.
370    pub fn get_local_addr(&self) -> Result<SocketAddr, ConnectionError> {
371        self.0
372            .msquic_conn
373            .get_local_addr()
374            .map(|addr| addr.as_socket().expect("socket addr"))
375            .map_err(ConnectionError::OtherError)
376    }
377
378    /// Get the remote address of the connection.
379    pub fn get_remote_addr(&self) -> Result<SocketAddr, ConnectionError> {
380        self.0
381            .msquic_conn
382            .get_remote_addr()
383            .map(|addr| addr.as_socket().expect("socket addr"))
384            .map_err(ConnectionError::OtherError)
385    }
386
387    /// Set whether to share the UDP binding.
388    pub fn set_share_binding(&self, share: bool) -> Result<(), ConnectionError> {
389        let share: u8 = if share { 1 } else { 0 };
390        unsafe {
391            msquic::Api::set_param(
392                self.0.msquic_conn.as_raw(),
393                msquic::ffi::QUIC_PARAM_CONN_SHARE_UDP_BINDING,
394                std::mem::size_of::<u8>() as u32,
395                &share as *const _ as *const _,
396            )
397        }
398        .map_err(ConnectionError::OtherError)
399    }
400
401    /// Add a new path to the connection.
402    #[cfg(feature = "msquic-seera")]
403    pub fn add_path(
404        &self,
405        local_addr: SocketAddr,
406        remote_addr: SocketAddr,
407    ) -> Result<(), ConnectionError> {
408        unsafe {
409            msquic::Api::set_param(
410                self.0.msquic_conn.as_raw(),
411                msquic::ffi::QUIC_PARAM_CONN_ADD_PATH,
412                std::mem::size_of::<msquic::ffi::QUIC_PATH_PARAM>() as u32,
413                &msquic::ffi::QUIC_PATH_PARAM {
414                    LocalAddress: &mut msquic::Addr::from(local_addr) as *mut _ as *mut _,
415                    RemoteAddress: &mut msquic::Addr::from(remote_addr) as *mut _ as *mut _,
416                } as *const _ as *const _,
417            )
418        }
419        .map_err(ConnectionError::OtherError)
420    }
421
422    /// Activate a path for the connection.
423    #[cfg(feature = "msquic-seera")]
424    pub fn activate_path(
425        &self,
426        local_addr: SocketAddr,
427        remote_addr: SocketAddr,
428    ) -> Result<(), ConnectionError> {
429        unsafe {
430            msquic::Api::set_param(
431                self.0.msquic_conn.as_raw(),
432                msquic::ffi::QUIC_PARAM_CONN_ACTIVATE_PATH,
433                std::mem::size_of::<msquic::ffi::QUIC_PATH_PARAM>() as u32,
434                &msquic::ffi::QUIC_PATH_PARAM {
435                    LocalAddress: &mut msquic::Addr::from(local_addr) as *mut _ as *mut _,
436                    RemoteAddress: &mut msquic::Addr::from(remote_addr) as *mut _ as *mut _,
437                } as *const _ as *const _,
438            )
439        }
440        .map_err(ConnectionError::OtherError)
441    }
442
443    /// Remove a path from the connection.
444    #[cfg(feature = "msquic-seera")]
445    pub fn remove_path(
446        &self,
447        local_addr: SocketAddr,
448        remote_addr: SocketAddr,
449    ) -> Result<(), ConnectionError> {
450        unsafe {
451            msquic::Api::set_param(
452                self.0.msquic_conn.as_raw(),
453                msquic::ffi::QUIC_PARAM_CONN_REMOVE_PATH,
454                std::mem::size_of::<msquic::ffi::QUIC_PATH_PARAM>() as u32,
455                &msquic::ffi::QUIC_PATH_PARAM {
456                    LocalAddress: &mut msquic::Addr::from(local_addr) as *mut _ as *mut _,
457                    RemoteAddress: &mut msquic::Addr::from(remote_addr) as *mut _ as *mut _,
458                } as *const _ as *const _,
459            )
460        }
461        .map_err(ConnectionError::OtherError)
462    }
463
464    /// Add a bound address to the connection.
465    #[cfg(feature = "msquic-seera")]
466    pub fn add_bound_addr(&self, addr: SocketAddr) -> Result<(), ConnectionError> {
467        unsafe {
468            msquic::Api::set_param(
469                self.0.msquic_conn.as_raw(),
470                msquic::ffi::QUIC_PARAM_CONN_ADD_BOUND_ADDRESS,
471                std::mem::size_of::<msquic::Addr>() as u32,
472                &msquic::Addr::from(addr) as *const _ as *const _,
473            )
474        }
475        .map_err(ConnectionError::OtherError)
476    }
477
478    /// Add an observed address to the connection.
479    #[cfg(feature = "msquic-seera")]
480    pub fn add_observed_addr(
481        &self,
482        addr: SocketAddr,
483        observed_addr: SocketAddr,
484    ) -> Result<(), ConnectionError> {
485        unsafe {
486            msquic::Api::set_param(
487                self.0.msquic_conn.as_raw(),
488                msquic::ffi::QUIC_PARAM_CONN_ADD_OBSERVED_ADDRESS,
489                std::mem::size_of::<msquic::ffi::QUIC_ADD_OBSERVED_ADDRESS>() as u32,
490                &msquic::ffi::QUIC_ADD_OBSERVED_ADDRESS {
491                    LocalAddress: &mut msquic::Addr::from(addr) as *mut _ as *mut _,
492                    ObservedAddress: &mut msquic::Addr::from(observed_addr) as *mut _ as *mut _,
493                } as *const _ as *const _,
494            )
495        }
496        .map_err(ConnectionError::OtherError)
497    }
498
499    /// Remove a bound address from the connection.
500    #[cfg(feature = "msquic-seera")]
501    pub fn remove_bound_addr(&self, addr: SocketAddr) -> Result<(), ConnectionError> {
502        unsafe {
503            msquic::Api::set_param(
504                self.0.msquic_conn.as_raw(),
505                msquic::ffi::QUIC_PARAM_CONN_REMOVE_BOUND_ADDRESS,
506                std::mem::size_of::<msquic::Addr>() as u32,
507                &msquic::Addr::from(addr) as *const _ as *const _,
508            )
509        }
510        .map_err(ConnectionError::OtherError)
511    }
512
513    /// Add a candidate address to the connection.
514    #[cfg(feature = "msquic-seera")]
515    pub fn add_candidate_addr(
516        &self,
517        host_addr: SocketAddr,
518        observed_addr: SocketAddr,
519    ) -> Result<(), ConnectionError> {
520        unsafe {
521            msquic::Api::set_param(
522                self.0.msquic_conn.as_raw(),
523                msquic::ffi::QUIC_PARAM_CONN_ADD_CANDIDATE_ADDRESS,
524                std::mem::size_of::<msquic::ffi::QUIC_CANDIDATE_ADDRESS>() as u32,
525                &msquic::ffi::QUIC_CANDIDATE_ADDRESS {
526                    HostAddress: &mut msquic::Addr::from(host_addr) as *mut _ as *mut _,
527                    ObservedAddress: &mut msquic::Addr::from(observed_addr) as *mut _ as *mut _,
528                } as *const _ as *const _,
529            )
530        }
531        .map_err(ConnectionError::OtherError)
532    }
533
534    /// Remove a candidate address from the connection.
535    #[cfg(feature = "msquic-seera")]
536    pub fn remove_candidate_addr(
537        &self,
538        host_addr: SocketAddr,
539        observed_addr: SocketAddr,
540    ) -> Result<(), ConnectionError> {
541        unsafe {
542            msquic::Api::set_param(
543                self.0.msquic_conn.as_raw(),
544                msquic::ffi::QUIC_PARAM_CONN_REMOVE_CANDIDATE_ADDRESS,
545                std::mem::size_of::<msquic::ffi::QUIC_CANDIDATE_ADDRESS>() as u32,
546                &msquic::ffi::QUIC_CANDIDATE_ADDRESS {
547                    HostAddress: &mut msquic::Addr::from(host_addr) as *mut _ as *mut _,
548                    ObservedAddress: &mut msquic::Addr::from(observed_addr) as *mut _ as *mut _,
549                } as *const _ as *const _,
550            )
551        }
552        .map_err(ConnectionError::OtherError)
553    }
554
555    /// Poll to receive events on the connection.
556    pub fn poll_event(&self, cx: &mut Context<'_>) -> Poll<Result<ConnectionEvent, EventError>> {
557        let mut exclusive = self.0.exclusive.lock().unwrap();
558        match exclusive.state {
559            ConnectionState::Open => {
560                return Poll::Ready(Err(EventError::ConnectionNotStarted));
561            }
562            ConnectionState::Connecting => {
563                exclusive.start_waiters.push(cx.waker().clone());
564                return Poll::Pending;
565            }
566            ConnectionState::Connected | ConnectionState::Shutdown => {}
567            ConnectionState::ShutdownComplete => {
568                return Poll::Ready(Err(EventError::ConnectionLost(
569                    exclusive.error.as_ref().expect("error").clone(),
570                )));
571            }
572        }
573
574        if exclusive.events.is_empty() {
575            exclusive.event_waiters.push(cx.waker().clone());
576            Poll::Pending
577        } else {
578            Poll::Ready(Ok(exclusive.events.pop_front().unwrap()))
579        }
580    }
581
582    /// Set the SSL key log file for the connection.
583    pub fn set_sslkeylog_file(&self, file: File) -> Result<(), ConnectionError> {
584        let mut exclusive = self.0.exclusive.lock().unwrap();
585        if exclusive.sslkeylog_file.is_some() {
586            return Err(ConnectionError::SslKeyLogFileAlreadySet);
587        }
588        if exclusive.tls_secrets.is_none() {
589            exclusive.tls_secrets = Some(Box::new(msquic::ffi::QUIC_TLS_SECRETS {
590                SecretLength: 0,
591                ClientRandom: [0; 32],
592                IsSet: QUIC_TLS_SECRETS__bindgen_ty_1 {
593                    _bitfield_align_1: [0; 0],
594                    _bitfield_1: QUIC_TLS_SECRETS__bindgen_ty_1::new_bitfield_1(
595                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8,
596                    ),
597                },
598                ClientEarlyTrafficSecret: [0; 64],
599                ClientHandshakeTrafficSecret: [0; 64],
600                ServerHandshakeTrafficSecret: [0; 64],
601                ClientTrafficSecret0: [0; 64],
602                ServerTrafficSecret0: [0; 64],
603            }));
604            unsafe {
605                msquic::Api::set_param(
606                    self.0.msquic_conn.as_raw(),
607                    msquic::ffi::QUIC_PARAM_CONN_TLS_SECRETS,
608                    std::mem::size_of::<msquic::ffi::QUIC_TLS_SECRETS>() as u32,
609                    exclusive.tls_secrets.as_ref().unwrap().as_ref() as *const _ as *const _,
610                )
611            }
612            .map_err(ConnectionError::OtherError)?;
613        }
614        exclusive.sslkeylog_file = Some(file);
615        Ok(())
616    }
617}
618
619struct ConnectionInstance {
620    inner: Arc<ConnectionInner>,
621    msquic_conn: msquic::Connection,
622}
623
624impl Deref for ConnectionInstance {
625    type Target = ConnectionInner;
626
627    fn deref(&self) -> &Self::Target {
628        &self.inner
629    }
630}
631
632impl Drop for ConnectionInstance {
633    fn drop(&mut self) {
634        trace!("ConnectionInstance({:p}) dropping", self);
635    }
636}
637
638struct ConnectionInner {
639    exclusive: Mutex<ConnectionInnerExclusive>,
640}
641
642struct ConnectionInnerExclusive {
643    state: ConnectionState,
644    error: Option<ConnectionError>,
645    start_waiters: Vec<Waker>,
646    inbound_stream_waiters: Vec<Waker>,
647    inbound_uni_stream_waiters: Vec<Waker>,
648    inbound_streams: VecDeque<crate::stream::Stream>,
649    inbound_uni_streams: VecDeque<crate::stream::ReadStream>,
650    recv_buffers: VecDeque<Bytes>,
651    recv_waiters: Vec<Waker>,
652    write_pool: Vec<WriteBuffer>,
653    dgram_send_enabled: bool,
654    dgram_max_send_length: u16,
655    shutdown_waiters: Vec<Waker>,
656    events: VecDeque<ConnectionEvent>,
657    event_waiters: Vec<Waker>,
658    sslkeylog_file: Option<File>,
659    tls_secrets: Option<Box<msquic::ffi::QUIC_TLS_SECRETS>>,
660}
661
662impl ConnectionInner {
663    fn new(
664        state: ConnectionState,
665        tls_secrets: Option<Box<msquic::ffi::QUIC_TLS_SECRETS>>,
666        sslkeylog_file: Option<File>,
667    ) -> Self {
668        Self {
669            exclusive: Mutex::new(ConnectionInnerExclusive {
670                state,
671                error: None,
672                start_waiters: Vec::new(),
673                inbound_stream_waiters: Vec::new(),
674                inbound_uni_stream_waiters: Vec::new(),
675                inbound_streams: VecDeque::new(),
676                inbound_uni_streams: VecDeque::new(),
677                recv_buffers: VecDeque::new(),
678                recv_waiters: Vec::new(),
679                write_pool: Vec::new(),
680                dgram_send_enabled: false,
681                dgram_max_send_length: 0,
682                shutdown_waiters: Vec::new(),
683                events: VecDeque::new(),
684                event_waiters: Vec::new(),
685                sslkeylog_file,
686                tls_secrets,
687            }),
688        }
689    }
690
691    fn handle_event_connected(
692        &self,
693        _session_resumed: bool,
694        _negotiated_alpn: &[u8],
695    ) -> Result<(), msquic::Status> {
696        trace!("ConnectionInner({:p}) Connected", self);
697
698        let mut exclusive = self.exclusive.lock().unwrap();
699        match (
700            exclusive.tls_secrets.take(),
701            exclusive.sslkeylog_file.take(),
702        ) {
703            (Some(tls_secrets), Some(mut file)) => {
704                info!("ConnectionInner({:p}) Writing TLS secrets to file", self);
705                let client_random = if tls_secrets.IsSet.ClientRandom() != 0 {
706                    hex::encode(tls_secrets.ClientRandom)
707                } else {
708                    String::new()
709                };
710
711                let _ = file.seek(SeekFrom::End(0));
712
713                if tls_secrets.IsSet.ClientEarlyTrafficSecret() != 0 {
714                    let _ = writeln!(
715                        file,
716                        "CLIENT_EARLY_TRAFFIC_SECRET {} {}",
717                        client_random,
718                        hex::encode(
719                            &tls_secrets.ClientEarlyTrafficSecret
720                                [0..tls_secrets.SecretLength as usize]
721                        )
722                    );
723                }
724
725                if tls_secrets.IsSet.ClientHandshakeTrafficSecret() != 0 {
726                    let _ = writeln!(
727                        file,
728                        "CLIENT_HANDSHAKE_TRAFFIC_SECRET {} {}",
729                        client_random,
730                        hex::encode(
731                            &tls_secrets.ClientHandshakeTrafficSecret
732                                [0..tls_secrets.SecretLength as usize]
733                        )
734                    );
735                }
736
737                if tls_secrets.IsSet.ServerHandshakeTrafficSecret() != 0 {
738                    let _ = writeln!(
739                        file,
740                        "SERVER_HANDSHAKE_TRAFFIC_SECRET {} {}",
741                        client_random,
742                        hex::encode(
743                            &tls_secrets.ServerHandshakeTrafficSecret
744                                [0..tls_secrets.SecretLength as usize]
745                        )
746                    );
747                }
748
749                if tls_secrets.IsSet.ClientTrafficSecret0() != 0 {
750                    let _ = writeln!(
751                        file,
752                        "CLIENT_TRAFFIC_SECRET_0 {} {}",
753                        client_random,
754                        hex::encode(
755                            &tls_secrets.ClientTrafficSecret0[0..tls_secrets.SecretLength as usize]
756                        )
757                    );
758                }
759
760                if tls_secrets.IsSet.ServerTrafficSecret0() != 0 {
761                    let _ = writeln!(
762                        file,
763                        "SERVER_TRAFFIC_SECRET_0 {} {}",
764                        client_random,
765                        hex::encode(
766                            &tls_secrets.ServerTrafficSecret0[0..tls_secrets.SecretLength as usize]
767                        )
768                    );
769                }
770                exclusive.tls_secrets = Some(tls_secrets);
771            }
772            _ => { /* do nothing */ }
773        }
774        exclusive.state = ConnectionState::Connected;
775        exclusive
776            .start_waiters
777            .drain(..)
778            .for_each(|waker| waker.wake());
779        Ok(())
780    }
781
782    fn handle_event_shutdown_initiated_by_transport(
783        &self,
784        status: msquic::Status,
785        error_code: u64,
786    ) -> Result<(), msquic::Status> {
787        trace!(
788            "ConnectionInner({:p}) Transport shutdown {:?}",
789            self,
790            status
791        );
792
793        let mut exclusive = self.exclusive.lock().unwrap();
794        exclusive.state = ConnectionState::Shutdown;
795        exclusive.error = Some(ConnectionError::ShutdownByTransport(status, error_code));
796        exclusive
797            .start_waiters
798            .drain(..)
799            .for_each(|waker| waker.wake());
800        exclusive
801            .inbound_stream_waiters
802            .drain(..)
803            .for_each(|waker| waker.wake());
804        Ok(())
805    }
806
807    fn handle_event_shutdown_initiated_by_peer(
808        &self,
809        error_code: u64,
810    ) -> Result<(), msquic::Status> {
811        trace!("ConnectionInner({:p}) App shutdown {}", self, error_code);
812
813        let mut exclusive = self.exclusive.lock().unwrap();
814        exclusive.state = ConnectionState::Shutdown;
815        exclusive.error = Some(ConnectionError::ShutdownByPeer(error_code));
816        exclusive
817            .start_waiters
818            .drain(..)
819            .for_each(|waker| waker.wake());
820        exclusive
821            .inbound_stream_waiters
822            .drain(..)
823            .for_each(|waker| waker.wake());
824        Ok(())
825    }
826
827    fn handle_event_shutdown_complete(
828        &self,
829        handshake_completed: bool,
830        peer_acknowledged_shutdown: bool,
831        app_close_in_progress: bool,
832    ) -> Result<(), msquic::Status> {
833        trace!("ConnectionInner({:p}) Shutdown complete: handshake_completed={}, peer_acknowledged_shutdown={}, app_close_in_progress={}",
834            self, handshake_completed, peer_acknowledged_shutdown, app_close_in_progress
835        );
836
837        {
838            let mut exclusive = self.exclusive.lock().unwrap();
839            exclusive.state = ConnectionState::ShutdownComplete;
840            if exclusive.error.is_none() {
841                exclusive.error = Some(ConnectionError::ShutdownByLocal);
842            }
843            exclusive
844                .start_waiters
845                .drain(..)
846                .for_each(|waker| waker.wake());
847            exclusive
848                .inbound_stream_waiters
849                .drain(..)
850                .for_each(|waker| waker.wake());
851            exclusive
852                .shutdown_waiters
853                .drain(..)
854                .for_each(|waker| waker.wake());
855            exclusive
856                .event_waiters
857                .drain(..)
858                .for_each(|waker| waker.wake());
859        }
860        Ok(())
861    }
862
863    fn handle_event_peer_stream_started(
864        &self,
865        stream: msquic::StreamRef,
866        flags: msquic::StreamOpenFlags,
867    ) -> Result<(), msquic::Status> {
868        let stream_type = if (flags & msquic::StreamOpenFlags::UNIDIRECTIONAL)
869            == msquic::StreamOpenFlags::UNIDIRECTIONAL
870        {
871            StreamType::Unidirectional
872        } else {
873            StreamType::Bidirectional
874        };
875        trace!(
876            "ConnectionInner({:p}) Peer stream started {:?}",
877            self,
878            stream_type
879        );
880
881        let stream = Stream::from_raw(unsafe { stream.as_raw() }, stream_type);
882        if (flags & msquic::StreamOpenFlags::UNIDIRECTIONAL)
883            == msquic::StreamOpenFlags::UNIDIRECTIONAL
884        {
885            if let (Some(read_stream), None) = stream.split() {
886                let mut exclusive = self.exclusive.lock().unwrap();
887                exclusive.inbound_uni_streams.push_back(read_stream);
888                exclusive
889                    .inbound_uni_stream_waiters
890                    .drain(..)
891                    .for_each(|waker| waker.wake());
892            } else {
893                unreachable!();
894            }
895        } else {
896            {
897                let mut exclusive = self.exclusive.lock().unwrap();
898                exclusive.inbound_streams.push_back(stream);
899                exclusive
900                    .inbound_stream_waiters
901                    .drain(..)
902                    .for_each(|waker| waker.wake());
903            }
904        }
905
906        Ok(())
907    }
908
909    fn handle_event_streams_available(
910        &self,
911        bidirectional_count: u16,
912        unidirectional_count: u16,
913    ) -> Result<(), msquic::Status> {
914        trace!(
915            "ConnectionInner({:p}) Streams available bidirectional_count:{} unidirectional_count:{}",
916            self,
917            bidirectional_count,
918            unidirectional_count
919        );
920        Ok(())
921    }
922
923    fn handle_event_datagram_state_changed(
924        &self,
925        send_enabled: bool,
926        max_send_length: u16,
927    ) -> Result<(), msquic::Status> {
928        trace!(
929            "ConnectionInner({:p}) Datagram state changed send_enabled:{} max_send_length:{}",
930            self,
931            send_enabled,
932            max_send_length
933        );
934        let mut exclusive = self.exclusive.lock().unwrap();
935        exclusive.dgram_send_enabled = send_enabled;
936        exclusive.dgram_max_send_length = max_send_length;
937        Ok(())
938    }
939
940    fn handle_event_datagram_received(
941        &self,
942        buffer: &msquic::BufferRef,
943        _flags: msquic::ReceiveFlags,
944    ) -> Result<(), msquic::Status> {
945        trace!("ConnectionInner({:p}) Datagram received", self);
946        let buf = Bytes::copy_from_slice(buffer.as_bytes());
947        {
948            let mut exclusive = self.exclusive.lock().unwrap();
949            exclusive.recv_buffers.push_back(buf);
950            exclusive
951                .recv_waiters
952                .drain(..)
953                .for_each(|waker| waker.wake());
954        }
955        Ok(())
956    }
957
958    fn handle_event_datagram_send_state_changed(
959        &self,
960        client_context: *const c_void,
961        state: msquic::DatagramSendState,
962    ) -> Result<(), msquic::Status> {
963        trace!(
964            "ConnectionInner({:p}) Datagram send state changed state:{:?}",
965            self,
966            state
967        );
968        match state {
969            msquic::DatagramSendState::Sent | msquic::DatagramSendState::Canceled => {
970                let mut write_buf = unsafe { WriteBuffer::from_raw(client_context) };
971                let mut exclusive = self.exclusive.lock().unwrap();
972                write_buf.reset();
973                exclusive.write_pool.push(write_buf);
974            }
975            _ => {}
976        }
977        Ok(())
978    }
979
980    #[cfg(feature = "msquic-seera")]
981    fn handle_event_notify_observed_address(
982        &self,
983        local_address: &msquic::Addr,
984        observed_address: &msquic::Addr,
985    ) -> Result<(), msquic::Status> {
986        let local_address = local_address.as_socket().expect("socket addr");
987        let observed_address = observed_address.as_socket().expect("socket addr");
988        trace!(
989            "ConnectionInner({:p}) Notify observed address local_address:{} observed_address:{}",
990            self,
991            local_address,
992            observed_address
993        );
994        let mut exclusive = self.exclusive.lock().unwrap();
995        exclusive
996            .events
997            .push_back(ConnectionEvent::NotifyObservedAddress {
998                local_address,
999                observed_address,
1000            });
1001        exclusive
1002            .event_waiters
1003            .drain(..)
1004            .for_each(|waker| waker.wake());
1005        Ok(())
1006    }
1007
1008    #[cfg(feature = "msquic-seera")]
1009    fn handle_event_notify_remote_address_added(
1010        &self,
1011        address: &msquic::Addr,
1012        sequence_number: u64,
1013    ) -> Result<(), msquic::Status> {
1014        let address = address.as_socket().expect("socket addr");
1015        trace!(
1016            "ConnectionInner({:p}) Notify remote address added address:{} sequence_number:{}",
1017            self,
1018            address,
1019            sequence_number
1020        );
1021        let mut exclusive = self.exclusive.lock().unwrap();
1022        exclusive
1023            .events
1024            .push_back(ConnectionEvent::NotifyRemoteAddressAdded {
1025                address,
1026                sequence_number,
1027            });
1028        exclusive
1029            .event_waiters
1030            .drain(..)
1031            .for_each(|waker| waker.wake());
1032        Ok(())
1033    }
1034
1035    #[cfg(feature = "msquic-seera")]
1036    fn handle_event_path_validated(
1037        &self,
1038        local_address: &msquic::Addr,
1039        remote_address: &msquic::Addr,
1040    ) -> Result<(), msquic::Status> {
1041        let local_address = local_address.as_socket().expect("socket addr");
1042        let remote_address = remote_address.as_socket().expect("socket addr");
1043        trace!(
1044            "ConnectionInner({:p}) path validated local_address:{} remote_address:{}",
1045            self,
1046            local_address,
1047            remote_address
1048        );
1049        let mut exclusive = self.exclusive.lock().unwrap();
1050        exclusive.events.push_back(ConnectionEvent::PathValidated {
1051            local_address,
1052            remote_address,
1053        });
1054        exclusive
1055            .event_waiters
1056            .drain(..)
1057            .for_each(|waker| waker.wake());
1058        Ok(())
1059    }
1060
1061    #[cfg(feature = "msquic-seera")]
1062    fn handle_event_notify_remote_address_removed(
1063        &self,
1064        sequence_number: u64,
1065    ) -> Result<(), msquic::Status> {
1066        trace!(
1067            "ConnectionInner({:p}) Notify remote address removed sequence_number:{}",
1068            self,
1069            sequence_number
1070        );
1071        let mut exclusive = self.exclusive.lock().unwrap();
1072        exclusive
1073            .events
1074            .push_back(ConnectionEvent::NotifyRemoteAddressRemoved { sequence_number });
1075        exclusive
1076            .event_waiters
1077            .drain(..)
1078            .for_each(|waker| waker.wake());
1079        Ok(())
1080    }
1081
1082    fn callback_handler_impl(
1083        &self,
1084        _connection: msquic::ConnectionRef,
1085        ev: msquic::ConnectionEvent,
1086    ) -> Result<(), msquic::Status> {
1087        match ev {
1088            msquic::ConnectionEvent::Connected {
1089                session_resumed,
1090                negotiated_alpn,
1091            } => self.handle_event_connected(session_resumed, negotiated_alpn),
1092            msquic::ConnectionEvent::ShutdownInitiatedByTransport { status, error_code } => {
1093                self.handle_event_shutdown_initiated_by_transport(status, error_code)
1094            }
1095            msquic::ConnectionEvent::ShutdownInitiatedByPeer { error_code } => {
1096                self.handle_event_shutdown_initiated_by_peer(error_code)
1097            }
1098            msquic::ConnectionEvent::ShutdownComplete {
1099                handshake_completed,
1100                peer_acknowledged_shutdown,
1101                app_close_in_progress,
1102            } => self.handle_event_shutdown_complete(
1103                handshake_completed,
1104                peer_acknowledged_shutdown,
1105                app_close_in_progress,
1106            ),
1107            msquic::ConnectionEvent::PeerStreamStarted { stream, flags } => {
1108                self.handle_event_peer_stream_started(stream, flags)
1109            }
1110            msquic::ConnectionEvent::StreamsAvailable {
1111                bidirectional_count,
1112                unidirectional_count,
1113            } => self.handle_event_streams_available(bidirectional_count, unidirectional_count),
1114            msquic::ConnectionEvent::DatagramStateChanged {
1115                send_enabled,
1116                max_send_length,
1117            } => self.handle_event_datagram_state_changed(send_enabled, max_send_length),
1118            msquic::ConnectionEvent::DatagramReceived { buffer, flags } => {
1119                self.handle_event_datagram_received(buffer, flags)
1120            }
1121            msquic::ConnectionEvent::DatagramSendStateChanged {
1122                client_context,
1123                state,
1124            } => self.handle_event_datagram_send_state_changed(client_context, state),
1125            #[cfg(feature = "msquic-seera")]
1126            msquic::ConnectionEvent::NotifyObservedAddress {
1127                local_address,
1128                observed_address,
1129            } => self.handle_event_notify_observed_address(local_address, observed_address),
1130            #[cfg(feature = "msquic-seera")]
1131            msquic::ConnectionEvent::NotifyRemoteAddressAdded {
1132                address,
1133                sequence_number,
1134            } => self.handle_event_notify_remote_address_added(address, sequence_number),
1135            #[cfg(feature = "msquic-seera")]
1136            msquic::ConnectionEvent::PathValidated {
1137                local_address,
1138                remote_address,
1139            } => self.handle_event_path_validated(local_address, remote_address),
1140            #[cfg(feature = "msquic-seera")]
1141            msquic::ConnectionEvent::NotifyRemoteAddressRemoved { sequence_number } => {
1142                self.handle_event_notify_remote_address_removed(sequence_number)
1143            }
1144            _ => {
1145                trace!("ConnectionInner({:p}) Other callback", self);
1146                Ok(())
1147            }
1148        }
1149    }
1150}
1151impl Drop for ConnectionInner {
1152    fn drop(&mut self) {
1153        trace!("ConnectionInner({:p}) dropping", self);
1154    }
1155}
1156
1157#[derive(Debug, PartialEq)]
1158enum ConnectionState {
1159    Open,
1160    Connecting,
1161    Connected,
1162    Shutdown,
1163    ShutdownComplete,
1164}
1165
1166/// Events that can occur on a connection.
1167#[derive(Clone, Debug, PartialEq, Eq)]
1168pub enum ConnectionEvent {
1169    /// A new observed address has been detected.
1170    NotifyObservedAddress {
1171        local_address: SocketAddr,
1172        observed_address: SocketAddr,
1173    },
1174    /// A new remote address has been added.
1175    NotifyRemoteAddressAdded {
1176        address: SocketAddr,
1177        sequence_number: u64,
1178    },
1179    /// A path has been validated.
1180    PathValidated {
1181        local_address: SocketAddr,
1182        remote_address: SocketAddr,
1183    },
1184    /// A remote address has been removed.
1185    NotifyRemoteAddressRemoved { sequence_number: u64 },
1186}
1187
1188/// Errors that can occur when managing a connection.
1189#[derive(Debug, Error, Clone)]
1190pub enum ConnectionError {
1191    #[error("connection shutdown by transport: status {0:?}, error 0x{1:x}")]
1192    ShutdownByTransport(msquic::Status, u64),
1193    #[error("connection shutdown by peer: error 0x{0:x}")]
1194    ShutdownByPeer(u64),
1195    #[error("connection shutdown by local")]
1196    ShutdownByLocal,
1197    #[error("connection closed")]
1198    ConnectionClosed,
1199    #[error("SSL key log file already set")]
1200    SslKeyLogFileAlreadySet,
1201    #[error("other error: status {0:?}")]
1202    OtherError(msquic::Status),
1203}
1204
1205/// Errors that can occur when receiving a datagram.
1206#[derive(Debug, Error, Clone)]
1207pub enum DgramReceiveError {
1208    #[error("connection not started yet")]
1209    ConnectionNotStarted,
1210    #[error("connection lost")]
1211    ConnectionLost(#[from] ConnectionError),
1212    #[error("other error: status {0:?}")]
1213    OtherError(msquic::Status),
1214}
1215
1216/// Errors that can occur when sending a datagram.
1217#[derive(Debug, Error, Clone)]
1218pub enum DgramSendError {
1219    #[error("connection not started yet")]
1220    ConnectionNotStarted,
1221    #[error("not allowed for sending dgram")]
1222    Denied,
1223    #[error("exceeded maximum data size for sending dgram")]
1224    TooBig,
1225    #[error("connection lost")]
1226    ConnectionLost(#[from] ConnectionError),
1227    #[error("other error: status {0:?}")]
1228    OtherError(msquic::Status),
1229}
1230
1231/// Errors that can occur when starting a connection.
1232#[derive(Debug, Error, Clone)]
1233pub enum StartError {
1234    #[error("connection lost")]
1235    ConnectionLost(#[from] ConnectionError),
1236    #[error("other error: status {0:?}")]
1237    OtherError(msquic::Status),
1238}
1239
1240/// Errors that can occur when shutdowning a connection.
1241#[derive(Debug, Error, Clone)]
1242pub enum ShutdownError {
1243    #[error("connection not started yet")]
1244    ConnectionNotStarted,
1245    #[error("connection lost")]
1246    ConnectionLost(#[from] ConnectionError),
1247    #[error("other error: status {0:?}")]
1248    OtherError(msquic::Status),
1249}
1250
1251/// Errors that can occur when receiving events on a connection.
1252#[derive(Debug, Error, Clone)]
1253pub enum EventError {
1254    #[error("connection not started yet")]
1255    ConnectionNotStarted,
1256    #[error("connection lost")]
1257    ConnectionLost(#[from] ConnectionError),
1258    #[error("other error: status {0:?}")]
1259    OtherError(msquic::Status),
1260}
1261
1262/// Future produced by [`Connection::start()`].
1263pub struct ConnectionStart<'a> {
1264    conn: &'a Connection,
1265    configuration: &'a msquic::Configuration,
1266    host: &'a str,
1267    port: u16,
1268}
1269
1270impl Future for ConnectionStart<'_> {
1271    type Output = Result<(), StartError>;
1272
1273    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1274        self.conn
1275            .poll_start(cx, self.configuration, self.host, self.port)
1276    }
1277}
1278
1279/// Future produced by [`Connection::open_outbound_stream()`].
1280pub struct OpenOutboundStream<'a> {
1281    conn: &'a ConnectionInstance,
1282    stream_type: Option<crate::stream::StreamType>,
1283    stream: Option<crate::stream::Stream>,
1284    fail_on_blocked: bool,
1285}
1286
1287impl Future for OpenOutboundStream<'_> {
1288    type Output = Result<crate::stream::Stream, StreamStartError>;
1289
1290    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1291        let this = self.get_mut();
1292        let OpenOutboundStream {
1293            conn,
1294            ref mut stream_type,
1295            ref mut stream,
1296            fail_on_blocked: fail_blocked,
1297            ..
1298        } = *this;
1299
1300        let mut exclusive = conn.inner.exclusive.lock().unwrap();
1301        match exclusive.state {
1302            ConnectionState::Open => {
1303                return Poll::Ready(Err(StreamStartError::ConnectionNotStarted));
1304            }
1305            ConnectionState::Connecting => {
1306                exclusive.start_waiters.push(cx.waker().clone());
1307                return Poll::Pending;
1308            }
1309            ConnectionState::Connected => {}
1310            ConnectionState::Shutdown | ConnectionState::ShutdownComplete => {
1311                return Poll::Ready(Err(StreamStartError::ConnectionLost(
1312                    exclusive.error.as_ref().expect("error").clone(),
1313                )));
1314            }
1315        }
1316        if stream.is_none() {
1317            match Stream::open(&conn.msquic_conn, stream_type.take().unwrap()) {
1318                Ok(new_stream) => {
1319                    *stream = Some(new_stream);
1320                }
1321                Err(e) => return Poll::Ready(Err(e)),
1322            }
1323        }
1324        stream
1325            .as_mut()
1326            .unwrap()
1327            .poll_start(cx, fail_blocked)
1328            .map(|res| res.map(|_| stream.take().unwrap()))
1329    }
1330}
1331
1332/// Future produced by [`Connection::accept_inbound_stream()`].
1333pub struct AcceptInboundStream<'a> {
1334    conn: &'a Connection,
1335}
1336
1337impl Future for AcceptInboundStream<'_> {
1338    type Output = Result<Stream, StreamStartError>;
1339
1340    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1341        self.conn.poll_accept_inbound_stream(cx)
1342    }
1343}
1344
1345/// Future produced by [`Connection::accept_inbound_uni_stream()`].
1346pub struct AcceptInboundUniStream<'a> {
1347    conn: &'a Connection,
1348}
1349
1350impl Future for AcceptInboundUniStream<'_> {
1351    type Output = Result<ReadStream, StreamStartError>;
1352
1353    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1354        self.conn.poll_accept_inbound_uni_stream(cx)
1355    }
1356}