msquic_async/
connection.rs

1use crate::buffer::WriteBuffer;
2use crate::stream::{ReadStream, StartError as StreamStartError, Stream, StreamType};
3
4use std::collections::VecDeque;
5use std::future::Future;
6use std::net::SocketAddr;
7use std::ops::Deref;
8use std::pin::Pin;
9use std::sync::{Arc, Mutex};
10use std::task::{Context, Poll, Waker};
11
12use bytes::Bytes;
13use libc::c_void;
14use thiserror::Error;
15use tracing::trace;
16
17#[derive(Clone)]
18pub struct Connection(Arc<ConnectionInstance>);
19
20impl Connection {
21    /// Create a new connection.
22    ///
23    /// The connection is not started until `start` is called.
24    pub fn new(registration: &msquic::Registration) -> Result<Self, ConnectionError> {
25        let inner = Arc::new(ConnectionInner::new(ConnectionState::Open));
26        let inner_in_ev = inner.clone();
27        let msquic_conn = msquic::Connection::open(registration, move |conn_ref, ev| {
28            inner_in_ev.callback_handler_impl(conn_ref, ev)
29        })
30        .map_err(ConnectionError::OtherError)?;
31        let instance = Arc::new(ConnectionInstance { inner, msquic_conn });
32        trace!(
33            "ConnectionInstance({:p}, Inner: {:p}) Open by local",
34            instance,
35            instance.inner
36        );
37        Ok(Self(instance))
38    }
39
40    pub(crate) fn from_raw(handle: msquic::ffi::HQUIC) -> Self {
41        let msquic_conn = unsafe { msquic::Connection::from_raw(handle) };
42        let inner = Arc::new(ConnectionInner::new(ConnectionState::Connected));
43        let inner_in_ev = inner.clone();
44        msquic_conn.set_callback_handler(move |conn_ref, ev| {
45            inner_in_ev.callback_handler_impl(conn_ref, ev)
46        });
47        let instance = Arc::new(ConnectionInstance { inner, msquic_conn });
48        trace!(
49            "ConnectionInstance({:p}, Inner: {:p}) Open by peer",
50            instance,
51            instance.inner
52        );
53        Self(instance)
54    }
55
56    /// Start the connection.
57    pub fn start<'a>(
58        &'a self,
59        configuration: &'a msquic::Configuration,
60        host: &'a str,
61        port: u16,
62    ) -> ConnectionStart<'a> {
63        ConnectionStart {
64            conn: self,
65            configuration,
66            host,
67            port,
68        }
69    }
70
71    /// Poll to start the connection.
72    fn poll_start(
73        &self,
74        cx: &mut Context<'_>,
75        configuration: &msquic::Configuration,
76        host: &str,
77        port: u16,
78    ) -> Poll<Result<(), StartError>> {
79        let mut exclusive = self.0.exclusive.lock().unwrap();
80        match exclusive.state {
81            ConnectionState::Open => {
82                self.0
83                    .msquic_conn
84                    .start(configuration, host, port)
85                    .map_err(StartError::OtherError)?;
86                exclusive.state = ConnectionState::Connecting;
87            }
88            ConnectionState::Connecting => {}
89            ConnectionState::Connected => return Poll::Ready(Ok(())),
90            ConnectionState::Shutdown | ConnectionState::ShutdownComplete => {
91                return Poll::Ready(Err(StartError::ConnectionLost(
92                    exclusive.error.as_ref().expect("error").clone(),
93                )));
94            }
95        }
96        exclusive.start_waiters.push(cx.waker().clone());
97        Poll::Pending
98    }
99
100    /// Open a new outbound stream.
101    pub fn open_outbound_stream(
102        &self,
103        stream_type: StreamType,
104        fail_on_blocked: bool,
105    ) -> OpenOutboundStream<'_> {
106        OpenOutboundStream {
107            conn: &self.0,
108            stream_type: Some(stream_type),
109            stream: None,
110            fail_on_blocked,
111        }
112    }
113
114    /// Accept an inbound bidilectional stream.
115    pub fn accept_inbound_stream(&self) -> AcceptInboundStream<'_> {
116        AcceptInboundStream { conn: self }
117    }
118
119    /// Poll to accept an inbound bidilectional stream.
120    pub fn poll_accept_inbound_stream(
121        &self,
122        cx: &mut Context<'_>,
123    ) -> Poll<Result<Stream, StreamStartError>> {
124        let mut exclusive = self.0.exclusive.lock().unwrap();
125        match exclusive.state {
126            ConnectionState::Open => {
127                return Poll::Ready(Err(StreamStartError::ConnectionNotStarted));
128            }
129            ConnectionState::Connecting => {
130                exclusive.start_waiters.push(cx.waker().clone());
131                return Poll::Pending;
132            }
133            ConnectionState::Connected => {}
134            ConnectionState::Shutdown | ConnectionState::ShutdownComplete => {
135                return Poll::Ready(Err(StreamStartError::ConnectionLost(
136                    exclusive.error.as_ref().expect("error").clone(),
137                )));
138            }
139        }
140
141        if !exclusive.inbound_streams.is_empty() {
142            return Poll::Ready(Ok(exclusive.inbound_streams.pop_front().unwrap()));
143        }
144        exclusive.inbound_stream_waiters.push(cx.waker().clone());
145        Poll::Pending
146    }
147
148    /// Accept an inbound unidirectional stream.
149    pub fn accept_inbound_uni_stream(&self) -> AcceptInboundUniStream<'_> {
150        AcceptInboundUniStream { conn: self }
151    }
152
153    /// Poll to accept an inbound unidirectional stream.
154    pub fn poll_accept_inbound_uni_stream(
155        &self,
156        cx: &mut Context<'_>,
157    ) -> Poll<Result<ReadStream, StreamStartError>> {
158        let mut exclusive = self.0.exclusive.lock().unwrap();
159        match exclusive.state {
160            ConnectionState::Open => {
161                return Poll::Ready(Err(StreamStartError::ConnectionNotStarted));
162            }
163            ConnectionState::Connecting => {
164                exclusive.start_waiters.push(cx.waker().clone());
165                return Poll::Pending;
166            }
167            ConnectionState::Connected => {}
168            ConnectionState::Shutdown | ConnectionState::ShutdownComplete => {
169                return Poll::Ready(Err(StreamStartError::ConnectionLost(
170                    exclusive.error.as_ref().expect("error").clone(),
171                )));
172            }
173        }
174
175        if !exclusive.inbound_uni_streams.is_empty() {
176            return Poll::Ready(Ok(exclusive.inbound_uni_streams.pop_front().unwrap()));
177        }
178        exclusive
179            .inbound_uni_stream_waiters
180            .push(cx.waker().clone());
181        Poll::Pending
182    }
183
184    /// Poll to receive a datagram.
185    pub fn poll_receive_datagram(
186        &self,
187        cx: &mut Context<'_>,
188    ) -> Poll<Result<Bytes, DgramReceiveError>> {
189        let mut exclusive = self.0.exclusive.lock().unwrap();
190        match exclusive.state {
191            ConnectionState::Open => {
192                return Poll::Ready(Err(DgramReceiveError::ConnectionNotStarted));
193            }
194            ConnectionState::Connecting => {
195                exclusive.start_waiters.push(cx.waker().clone());
196                return Poll::Pending;
197            }
198            ConnectionState::Connected => {}
199            ConnectionState::Shutdown | ConnectionState::ShutdownComplete => {
200                return Poll::Ready(Err(DgramReceiveError::ConnectionLost(
201                    exclusive.error.as_ref().expect("error").clone(),
202                )));
203            }
204        }
205
206        if let Some(buf) = exclusive.recv_buffers.pop_front() {
207            Poll::Ready(Ok(buf))
208        } else {
209            exclusive.recv_waiters.push(cx.waker().clone());
210            Poll::Pending
211        }
212    }
213
214    /// Poll to send a datagram.
215    pub fn poll_send_datagram(
216        &self,
217        cx: &mut Context<'_>,
218        buf: &Bytes,
219    ) -> Poll<Result<(), DgramSendError>> {
220        let mut exclusive = self.0.exclusive.lock().unwrap();
221        match exclusive.state {
222            ConnectionState::Open => {
223                return Poll::Ready(Err(DgramSendError::ConnectionNotStarted));
224            }
225            ConnectionState::Connecting => {
226                exclusive.start_waiters.push(cx.waker().clone());
227                return Poll::Pending;
228            }
229            ConnectionState::Connected => {}
230            ConnectionState::Shutdown | ConnectionState::ShutdownComplete => {
231                return Poll::Ready(Err(DgramSendError::ConnectionLost(
232                    exclusive.error.as_ref().expect("error").clone(),
233                )));
234            }
235        }
236
237        let mut write_buf = exclusive.write_pool.pop().unwrap_or(WriteBuffer::new());
238        let _ = write_buf.put_zerocopy(buf);
239        let buffers = unsafe {
240            let (data, len) = write_buf.get_buffers();
241            std::slice::from_raw_parts(data, len)
242        };
243        let res = unsafe {
244            self.0.msquic_conn.datagram_send(
245                buffers,
246                msquic::SendFlags::NONE,
247                write_buf.into_raw() as *const _,
248            )
249        }
250        .map_err(DgramSendError::OtherError);
251        Poll::Ready(res)
252    }
253
254    /// Send a datagram.
255    pub fn send_datagram(&self, buf: &Bytes) -> Result<(), DgramSendError> {
256        let mut exclusive = self.0.exclusive.lock().unwrap();
257        match exclusive.state {
258            ConnectionState::Open => {
259                return Err(DgramSendError::ConnectionNotStarted);
260            }
261            ConnectionState::Connecting => {
262                return Err(DgramSendError::ConnectionNotStarted);
263            }
264            ConnectionState::Connected => {}
265            ConnectionState::Shutdown | ConnectionState::ShutdownComplete => {
266                return Err(DgramSendError::ConnectionLost(
267                    exclusive.error.as_ref().expect("error").clone(),
268                ));
269            }
270        }
271
272        if !exclusive.dgram_send_enabled {
273            return Err(DgramSendError::Denied);
274        }
275        if buf.len() > exclusive.dgram_max_send_length as usize {
276            return Err(DgramSendError::TooBig);
277        }
278
279        let mut write_buf = exclusive.write_pool.pop().unwrap_or(WriteBuffer::new());
280        let _ = write_buf.put_zerocopy(buf);
281        let buffers = unsafe {
282            let (data, len) = write_buf.get_buffers();
283            std::slice::from_raw_parts(data, len)
284        };
285        unsafe {
286            self.0.msquic_conn.datagram_send(
287                buffers,
288                msquic::SendFlags::NONE,
289                write_buf.into_raw() as *const _,
290            )
291        }
292        .map_err(DgramSendError::OtherError)?;
293        Ok(())
294    }
295
296    /// Poll to shutdown the connection.
297    pub fn poll_shutdown(
298        &self,
299        cx: &mut Context<'_>,
300        error_code: u64,
301    ) -> Poll<Result<(), ShutdownError>> {
302        let mut exclusive = self.0.exclusive.lock().unwrap();
303        match exclusive.state {
304            ConnectionState::Open => {
305                return Poll::Ready(Err(ShutdownError::ConnectionNotStarted));
306            }
307            ConnectionState::Connecting => {
308                exclusive.start_waiters.push(cx.waker().clone());
309                return Poll::Pending;
310            }
311            ConnectionState::Connected => {
312                self.0
313                    .msquic_conn
314                    .shutdown(msquic::ConnectionShutdownFlags::NONE, error_code);
315                exclusive.state = ConnectionState::Shutdown;
316            }
317            ConnectionState::Shutdown => {}
318            ConnectionState::ShutdownComplete => {
319                if let Some(ConnectionError::ShutdownByLocal) = &exclusive.error {
320                    return Poll::Ready(Ok(()));
321                } else {
322                    return Poll::Ready(Err(ShutdownError::ConnectionLost(
323                        exclusive.error.as_ref().expect("error").clone(),
324                    )));
325                }
326            }
327        }
328
329        exclusive.shutdown_waiters.push(cx.waker().clone());
330        Poll::Pending
331    }
332
333    /// Shutdown the connection.
334    pub fn shutdown(&self, error_code: u64) -> Result<(), ShutdownError> {
335        let mut exclusive = self.0.exclusive.lock().unwrap();
336        match exclusive.state {
337            ConnectionState::Open | ConnectionState::Connecting => {
338                return Err(ShutdownError::ConnectionNotStarted);
339            }
340            ConnectionState::Connected => {
341                self.0
342                    .msquic_conn
343                    .shutdown(msquic::ConnectionShutdownFlags::NONE, error_code);
344                exclusive.state = ConnectionState::Shutdown;
345            }
346            _ => {}
347        }
348        Ok(())
349    }
350
351    /// Get the local address of the connection.
352    pub fn get_local_addr(&self) -> Result<SocketAddr, ConnectionError> {
353        self.0
354            .msquic_conn
355            .get_local_addr()
356            .map(|addr| addr.as_socket().expect("socket addr"))
357            .map_err(ConnectionError::OtherError)
358    }
359
360    /// Get the remote address of the connection.
361    pub fn get_remote_addr(&self) -> Result<SocketAddr, ConnectionError> {
362        self.0
363            .msquic_conn
364            .get_remote_addr()
365            .map(|addr| addr.as_socket().expect("socket addr"))
366            .map_err(ConnectionError::OtherError)
367    }
368}
369
370struct ConnectionInstance {
371    inner: Arc<ConnectionInner>,
372    msquic_conn: msquic::Connection,
373}
374
375impl Deref for ConnectionInstance {
376    type Target = ConnectionInner;
377
378    fn deref(&self) -> &Self::Target {
379        &self.inner
380    }
381}
382
383impl Drop for ConnectionInstance {
384    fn drop(&mut self) {
385        trace!("ConnectionInstance({:p}) dropping", self);
386    }
387}
388
389struct ConnectionInner {
390    exclusive: Mutex<ConnectionInnerExclusive>,
391}
392
393struct ConnectionInnerExclusive {
394    state: ConnectionState,
395    error: Option<ConnectionError>,
396    start_waiters: Vec<Waker>,
397    inbound_stream_waiters: Vec<Waker>,
398    inbound_uni_stream_waiters: Vec<Waker>,
399    inbound_streams: VecDeque<crate::stream::Stream>,
400    inbound_uni_streams: VecDeque<crate::stream::ReadStream>,
401    recv_buffers: VecDeque<Bytes>,
402    recv_waiters: Vec<Waker>,
403    write_pool: Vec<WriteBuffer>,
404    dgram_send_enabled: bool,
405    dgram_max_send_length: u16,
406    shutdown_waiters: Vec<Waker>,
407}
408
409impl ConnectionInner {
410    fn new(state: ConnectionState) -> Self {
411        Self {
412            exclusive: Mutex::new(ConnectionInnerExclusive {
413                state,
414                error: None,
415                start_waiters: Vec::new(),
416                inbound_stream_waiters: Vec::new(),
417                inbound_uni_stream_waiters: Vec::new(),
418                inbound_streams: VecDeque::new(),
419                inbound_uni_streams: VecDeque::new(),
420                recv_buffers: VecDeque::new(),
421                recv_waiters: Vec::new(),
422                write_pool: Vec::new(),
423                dgram_send_enabled: false,
424                dgram_max_send_length: 0,
425                shutdown_waiters: Vec::new(),
426            }),
427        }
428    }
429
430    fn handle_event_connected(
431        &self,
432        _session_resumed: bool,
433        _negotiated_alpn: &[u8],
434    ) -> Result<(), msquic::Status> {
435        trace!("ConnectionInner({:p}) Connected", self);
436
437        let mut exclusive = self.exclusive.lock().unwrap();
438        exclusive.state = ConnectionState::Connected;
439        exclusive
440            .start_waiters
441            .drain(..)
442            .for_each(|waker| waker.wake());
443        Ok(())
444    }
445
446    fn handle_event_shutdown_initiated_by_transport(
447        &self,
448        status: msquic::Status,
449        error_code: u64,
450    ) -> Result<(), msquic::Status> {
451        trace!(
452            "ConnectionInner({:p}) Transport shutdown {:?}",
453            self,
454            status
455        );
456
457        let mut exclusive = self.exclusive.lock().unwrap();
458        exclusive.state = ConnectionState::Shutdown;
459        exclusive.error = Some(ConnectionError::ShutdownByTransport(status, error_code));
460        exclusive
461            .start_waiters
462            .drain(..)
463            .for_each(|waker| waker.wake());
464        exclusive
465            .inbound_stream_waiters
466            .drain(..)
467            .for_each(|waker| waker.wake());
468        Ok(())
469    }
470
471    fn handle_event_shutdown_initiated_by_peer(
472        &self,
473        error_code: u64,
474    ) -> Result<(), msquic::Status> {
475        trace!("ConnectionInner({:p}) App shutdown {}", self, error_code);
476
477        let mut exclusive = self.exclusive.lock().unwrap();
478        exclusive.state = ConnectionState::Shutdown;
479        exclusive.error = Some(ConnectionError::ShutdownByPeer(error_code));
480        exclusive
481            .start_waiters
482            .drain(..)
483            .for_each(|waker| waker.wake());
484        exclusive
485            .inbound_stream_waiters
486            .drain(..)
487            .for_each(|waker| waker.wake());
488        Ok(())
489    }
490
491    fn handle_event_shutdown_complete(
492        &self,
493        handshake_completed: bool,
494        peer_acknowledged_shutdown: bool,
495        app_close_in_progress: bool,
496    ) -> Result<(), msquic::Status> {
497        trace!("ConnectionInner({:p}) Shutdown complete: handshake_completed={}, peer_acknowledged_shutdown={}, app_close_in_progress={}",
498            self, handshake_completed, peer_acknowledged_shutdown, app_close_in_progress
499        );
500
501        {
502            let mut exclusive = self.exclusive.lock().unwrap();
503            exclusive.state = ConnectionState::ShutdownComplete;
504            if exclusive.error.is_none() {
505                exclusive.error = Some(ConnectionError::ShutdownByLocal);
506            }
507            exclusive
508                .start_waiters
509                .drain(..)
510                .for_each(|waker| waker.wake());
511            exclusive
512                .inbound_stream_waiters
513                .drain(..)
514                .for_each(|waker| waker.wake());
515            exclusive
516                .shutdown_waiters
517                .drain(..)
518                .for_each(|waker| waker.wake());
519        }
520        Ok(())
521    }
522
523    fn handle_event_peer_stream_started(
524        &self,
525        stream: msquic::StreamRef,
526        flags: msquic::StreamOpenFlags,
527    ) -> Result<(), msquic::Status> {
528        let stream_type = if (flags & msquic::StreamOpenFlags::UNIDIRECTIONAL)
529            == msquic::StreamOpenFlags::UNIDIRECTIONAL
530        {
531            StreamType::Unidirectional
532        } else {
533            StreamType::Bidirectional
534        };
535        trace!(
536            "ConnectionInner({:p}) Peer stream started {:?}",
537            self,
538            stream_type
539        );
540
541        let stream = Stream::from_raw(unsafe { stream.as_raw() }, stream_type);
542        if (flags & msquic::StreamOpenFlags::UNIDIRECTIONAL)
543            == msquic::StreamOpenFlags::UNIDIRECTIONAL
544        {
545            if let (Some(read_stream), None) = stream.split() {
546                let mut exclusive = self.exclusive.lock().unwrap();
547                exclusive.inbound_uni_streams.push_back(read_stream);
548                exclusive
549                    .inbound_uni_stream_waiters
550                    .drain(..)
551                    .for_each(|waker| waker.wake());
552            } else {
553                unreachable!();
554            }
555        } else {
556            {
557                let mut exclusive = self.exclusive.lock().unwrap();
558                exclusive.inbound_streams.push_back(stream);
559                exclusive
560                    .inbound_stream_waiters
561                    .drain(..)
562                    .for_each(|waker| waker.wake());
563            }
564        }
565
566        Ok(())
567    }
568
569    fn handle_event_streams_available(
570        &self,
571        bidirectional_count: u16,
572        unidirectional_count: u16,
573    ) -> Result<(), msquic::Status> {
574        trace!(
575            "ConnectionInner({:p}) Streams available bidirectional_count:{} unidirectional_count:{}",
576            self,
577            bidirectional_count,
578            unidirectional_count
579        );
580        Ok(())
581    }
582
583    fn handle_event_datagram_state_changed(
584        &self,
585        send_enabled: bool,
586        max_send_length: u16,
587    ) -> Result<(), msquic::Status> {
588        trace!(
589            "ConnectionInner({:p}) Datagram state changed send_enabled:{} max_send_length:{}",
590            self,
591            send_enabled,
592            max_send_length
593        );
594        let mut exclusive = self.exclusive.lock().unwrap();
595        exclusive.dgram_send_enabled = send_enabled;
596        exclusive.dgram_max_send_length = max_send_length;
597        Ok(())
598    }
599
600    fn handle_event_datagram_received(
601        &self,
602        buffer: &msquic::BufferRef,
603        _flags: msquic::ReceiveFlags,
604    ) -> Result<(), msquic::Status> {
605        trace!("ConnectionInner({:p}) Datagram received", self);
606        let buf = Bytes::copy_from_slice(buffer.as_bytes());
607        {
608            let mut exclusive = self.exclusive.lock().unwrap();
609            exclusive.recv_buffers.push_back(buf);
610            exclusive
611                .recv_waiters
612                .drain(..)
613                .for_each(|waker| waker.wake());
614        }
615        Ok(())
616    }
617
618    fn handle_event_datagram_send_state_changed(
619        &self,
620        client_context: *const c_void,
621        state: msquic::DatagramSendState,
622    ) -> Result<(), msquic::Status> {
623        trace!(
624            "ConnectionInner({:p}) Datagram send state changed state:{:?}",
625            self,
626            state
627        );
628        match state {
629            msquic::DatagramSendState::Sent | msquic::DatagramSendState::Canceled => {
630                let mut write_buf = unsafe { WriteBuffer::from_raw(client_context) };
631                let mut exclusive = self.exclusive.lock().unwrap();
632                write_buf.reset();
633                exclusive.write_pool.push(write_buf);
634            }
635            _ => {}
636        }
637        Ok(())
638    }
639
640    fn callback_handler_impl(
641        &self,
642        _connection: msquic::ConnectionRef,
643        ev: msquic::ConnectionEvent,
644    ) -> Result<(), msquic::Status> {
645        match ev {
646            msquic::ConnectionEvent::Connected {
647                session_resumed,
648                negotiated_alpn,
649            } => self.handle_event_connected(session_resumed, negotiated_alpn),
650            msquic::ConnectionEvent::ShutdownInitiatedByTransport { status, error_code } => {
651                self.handle_event_shutdown_initiated_by_transport(status, error_code)
652            }
653            msquic::ConnectionEvent::ShutdownInitiatedByPeer { error_code } => {
654                self.handle_event_shutdown_initiated_by_peer(error_code)
655            }
656            msquic::ConnectionEvent::ShutdownComplete {
657                handshake_completed,
658                peer_acknowledged_shutdown,
659                app_close_in_progress,
660            } => self.handle_event_shutdown_complete(
661                handshake_completed,
662                peer_acknowledged_shutdown,
663                app_close_in_progress,
664            ),
665            msquic::ConnectionEvent::PeerStreamStarted { stream, flags } => {
666                self.handle_event_peer_stream_started(stream, flags)
667            }
668            msquic::ConnectionEvent::StreamsAvailable {
669                bidirectional_count,
670                unidirectional_count,
671            } => self.handle_event_streams_available(bidirectional_count, unidirectional_count),
672            msquic::ConnectionEvent::DatagramStateChanged {
673                send_enabled,
674                max_send_length,
675            } => self.handle_event_datagram_state_changed(send_enabled, max_send_length),
676            msquic::ConnectionEvent::DatagramReceived { buffer, flags } => {
677                self.handle_event_datagram_received(buffer, flags)
678            }
679            msquic::ConnectionEvent::DatagramSendStateChanged {
680                client_context,
681                state,
682            } => self.handle_event_datagram_send_state_changed(client_context, state),
683            _ => {
684                trace!("ConnectionInner({:p}) Other callback", self);
685                Ok(())
686            }
687        }
688    }
689}
690impl Drop for ConnectionInner {
691    fn drop(&mut self) {
692        trace!("ConnectionInner({:p}) dropping", self);
693    }
694}
695
696#[derive(Debug, PartialEq)]
697enum ConnectionState {
698    Open,
699    Connecting,
700    Connected,
701    Shutdown,
702    ShutdownComplete,
703}
704
705/// Errors that can occur when managing a connection.
706#[derive(Debug, Error, Clone)]
707pub enum ConnectionError {
708    #[error("connection shutdown by transport: status {0:?}, error 0x{1:x}")]
709    ShutdownByTransport(msquic::Status, u64),
710    #[error("connection shutdown by peer: error 0x{0:x}")]
711    ShutdownByPeer(u64),
712    #[error("connection shutdown by local")]
713    ShutdownByLocal,
714    #[error("connection closed")]
715    ConnectionClosed,
716    #[error("other error: status {0:?}")]
717    OtherError(msquic::Status),
718}
719
720/// Errors that can occur when receiving a datagram.
721#[derive(Debug, Error, Clone)]
722pub enum DgramReceiveError {
723    #[error("connection not started yet")]
724    ConnectionNotStarted,
725    #[error("connection lost")]
726    ConnectionLost(#[from] ConnectionError),
727    #[error("other error: status {0:?}")]
728    OtherError(msquic::Status),
729}
730
731/// Errors that can occur when sending a datagram.
732#[derive(Debug, Error, Clone)]
733pub enum DgramSendError {
734    #[error("connection not started yet")]
735    ConnectionNotStarted,
736    #[error("not allowed for sending dgram")]
737    Denied,
738    #[error("exceeded maximum data size for sending dgram")]
739    TooBig,
740    #[error("connection lost")]
741    ConnectionLost(#[from] ConnectionError),
742    #[error("other error: status {0:?}")]
743    OtherError(msquic::Status),
744}
745
746/// Errors that can occur when starting a connection.
747#[derive(Debug, Error, Clone)]
748pub enum StartError {
749    #[error("connection lost")]
750    ConnectionLost(#[from] ConnectionError),
751    #[error("other error: status {0:?}")]
752    OtherError(msquic::Status),
753}
754
755/// Errors that can occur when shutdowning a connection.
756#[derive(Debug, Error, Clone)]
757pub enum ShutdownError {
758    #[error("connection not started yet")]
759    ConnectionNotStarted,
760    #[error("connection lost")]
761    ConnectionLost(#[from] ConnectionError),
762    #[error("other error: status {0:?}")]
763    OtherError(msquic::Status),
764}
765
766/// Future produced by [`Connection::start()`].
767pub struct ConnectionStart<'a> {
768    conn: &'a Connection,
769    configuration: &'a msquic::Configuration,
770    host: &'a str,
771    port: u16,
772}
773
774impl Future for ConnectionStart<'_> {
775    type Output = Result<(), StartError>;
776
777    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
778        self.conn
779            .poll_start(cx, self.configuration, self.host, self.port)
780    }
781}
782
783/// Future produced by [`Connection::open_outbound_stream()`].
784pub struct OpenOutboundStream<'a> {
785    conn: &'a ConnectionInstance,
786    stream_type: Option<crate::stream::StreamType>,
787    stream: Option<crate::stream::Stream>,
788    fail_on_blocked: bool,
789}
790
791impl Future for OpenOutboundStream<'_> {
792    type Output = Result<crate::stream::Stream, StreamStartError>;
793
794    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
795        let this = self.get_mut();
796        let OpenOutboundStream {
797            conn,
798            ref mut stream_type,
799            ref mut stream,
800            fail_on_blocked: fail_blocked,
801            ..
802        } = *this;
803
804        let mut exclusive = conn.inner.exclusive.lock().unwrap();
805        match exclusive.state {
806            ConnectionState::Open => {
807                return Poll::Ready(Err(StreamStartError::ConnectionNotStarted));
808            }
809            ConnectionState::Connecting => {
810                exclusive.start_waiters.push(cx.waker().clone());
811                return Poll::Pending;
812            }
813            ConnectionState::Connected => {}
814            ConnectionState::Shutdown | ConnectionState::ShutdownComplete => {
815                return Poll::Ready(Err(StreamStartError::ConnectionLost(
816                    exclusive.error.as_ref().expect("error").clone(),
817                )));
818            }
819        }
820        if stream.is_none() {
821            match Stream::open(&conn.msquic_conn, stream_type.take().unwrap()) {
822                Ok(new_stream) => {
823                    *stream = Some(new_stream);
824                }
825                Err(e) => return Poll::Ready(Err(e)),
826            }
827        }
828        stream
829            .as_mut()
830            .unwrap()
831            .poll_start(cx, fail_blocked)
832            .map(|res| res.map(|_| stream.take().unwrap()))
833    }
834}
835
836/// Future produced by [`Connection::accept_inbound_stream()`].
837pub struct AcceptInboundStream<'a> {
838    conn: &'a Connection,
839}
840
841impl Future for AcceptInboundStream<'_> {
842    type Output = Result<Stream, StreamStartError>;
843
844    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
845        self.conn.poll_accept_inbound_stream(cx)
846    }
847}
848
849/// Future produced by [`Connection::accept_inbound_uni_stream()`].
850pub struct AcceptInboundUniStream<'a> {
851    conn: &'a Connection,
852}
853
854impl Future for AcceptInboundUniStream<'_> {
855    type Output = Result<ReadStream, StreamStartError>;
856
857    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
858        self.conn.poll_accept_inbound_uni_stream(cx)
859    }
860}