1use crate::buffer::WriteBuffer;
2use crate::stream::{ReadStream, StartError as StreamStartError, Stream, StreamType};
3
4use std::collections::VecDeque;
5use std::future::Future;
6use std::net::SocketAddr;
7use std::ops::Deref;
8use std::pin::Pin;
9use std::sync::{Arc, Mutex};
10use std::task::{Context, Poll, Waker};
11
12use bytes::Bytes;
13use libc::c_void;
14use thiserror::Error;
15use tracing::trace;
16
17#[derive(Clone)]
18pub struct Connection(Arc<ConnectionInstance>);
19
20impl Connection {
21 pub fn new(registration: &msquic::Registration) -> Result<Self, ConnectionError> {
25 let inner = Arc::new(ConnectionInner::new(ConnectionState::Open));
26 let inner_in_ev = inner.clone();
27 let msquic_conn = msquic::Connection::open(registration, move |conn_ref, ev| {
28 inner_in_ev.callback_handler_impl(conn_ref, ev)
29 })
30 .map_err(ConnectionError::OtherError)?;
31 let instance = Arc::new(ConnectionInstance { inner, msquic_conn });
32 trace!(
33 "ConnectionInstance({:p}, Inner: {:p}) Open by local",
34 instance,
35 instance.inner
36 );
37 Ok(Self(instance))
38 }
39
40 pub(crate) fn from_raw(handle: msquic::ffi::HQUIC) -> Self {
41 let msquic_conn = unsafe { msquic::Connection::from_raw(handle) };
42 let inner = Arc::new(ConnectionInner::new(ConnectionState::Connected));
43 let inner_in_ev = inner.clone();
44 msquic_conn.set_callback_handler(move |conn_ref, ev| {
45 inner_in_ev.callback_handler_impl(conn_ref, ev)
46 });
47 let instance = Arc::new(ConnectionInstance { inner, msquic_conn });
48 trace!(
49 "ConnectionInstance({:p}, Inner: {:p}) Open by peer",
50 instance,
51 instance.inner
52 );
53 Self(instance)
54 }
55
56 pub fn start<'a>(
58 &'a self,
59 configuration: &'a msquic::Configuration,
60 host: &'a str,
61 port: u16,
62 ) -> ConnectionStart<'a> {
63 ConnectionStart {
64 conn: self,
65 configuration,
66 host,
67 port,
68 }
69 }
70
71 fn poll_start(
73 &self,
74 cx: &mut Context<'_>,
75 configuration: &msquic::Configuration,
76 host: &str,
77 port: u16,
78 ) -> Poll<Result<(), StartError>> {
79 let mut exclusive = self.0.exclusive.lock().unwrap();
80 match exclusive.state {
81 ConnectionState::Open => {
82 self.0
83 .msquic_conn
84 .start(configuration, host, port)
85 .map_err(StartError::OtherError)?;
86 exclusive.state = ConnectionState::Connecting;
87 }
88 ConnectionState::Connecting => {}
89 ConnectionState::Connected => return Poll::Ready(Ok(())),
90 ConnectionState::Shutdown | ConnectionState::ShutdownComplete => {
91 return Poll::Ready(Err(StartError::ConnectionLost(
92 exclusive.error.as_ref().expect("error").clone(),
93 )));
94 }
95 }
96 exclusive.start_waiters.push(cx.waker().clone());
97 Poll::Pending
98 }
99
100 pub fn open_outbound_stream(
102 &self,
103 stream_type: StreamType,
104 fail_on_blocked: bool,
105 ) -> OpenOutboundStream<'_> {
106 OpenOutboundStream {
107 conn: &self.0,
108 stream_type: Some(stream_type),
109 stream: None,
110 fail_on_blocked,
111 }
112 }
113
114 pub fn accept_inbound_stream(&self) -> AcceptInboundStream<'_> {
116 AcceptInboundStream { conn: self }
117 }
118
119 pub fn poll_accept_inbound_stream(
121 &self,
122 cx: &mut Context<'_>,
123 ) -> Poll<Result<Stream, StreamStartError>> {
124 let mut exclusive = self.0.exclusive.lock().unwrap();
125 match exclusive.state {
126 ConnectionState::Open => {
127 return Poll::Ready(Err(StreamStartError::ConnectionNotStarted));
128 }
129 ConnectionState::Connecting => {
130 exclusive.start_waiters.push(cx.waker().clone());
131 return Poll::Pending;
132 }
133 ConnectionState::Connected => {}
134 ConnectionState::Shutdown | ConnectionState::ShutdownComplete => {
135 return Poll::Ready(Err(StreamStartError::ConnectionLost(
136 exclusive.error.as_ref().expect("error").clone(),
137 )));
138 }
139 }
140
141 if !exclusive.inbound_streams.is_empty() {
142 return Poll::Ready(Ok(exclusive.inbound_streams.pop_front().unwrap()));
143 }
144 exclusive.inbound_stream_waiters.push(cx.waker().clone());
145 Poll::Pending
146 }
147
148 pub fn accept_inbound_uni_stream(&self) -> AcceptInboundUniStream<'_> {
150 AcceptInboundUniStream { conn: self }
151 }
152
153 pub fn poll_accept_inbound_uni_stream(
155 &self,
156 cx: &mut Context<'_>,
157 ) -> Poll<Result<ReadStream, StreamStartError>> {
158 let mut exclusive = self.0.exclusive.lock().unwrap();
159 match exclusive.state {
160 ConnectionState::Open => {
161 return Poll::Ready(Err(StreamStartError::ConnectionNotStarted));
162 }
163 ConnectionState::Connecting => {
164 exclusive.start_waiters.push(cx.waker().clone());
165 return Poll::Pending;
166 }
167 ConnectionState::Connected => {}
168 ConnectionState::Shutdown | ConnectionState::ShutdownComplete => {
169 return Poll::Ready(Err(StreamStartError::ConnectionLost(
170 exclusive.error.as_ref().expect("error").clone(),
171 )));
172 }
173 }
174
175 if !exclusive.inbound_uni_streams.is_empty() {
176 return Poll::Ready(Ok(exclusive.inbound_uni_streams.pop_front().unwrap()));
177 }
178 exclusive
179 .inbound_uni_stream_waiters
180 .push(cx.waker().clone());
181 Poll::Pending
182 }
183
184 pub fn poll_receive_datagram(
186 &self,
187 cx: &mut Context<'_>,
188 ) -> Poll<Result<Bytes, DgramReceiveError>> {
189 let mut exclusive = self.0.exclusive.lock().unwrap();
190 match exclusive.state {
191 ConnectionState::Open => {
192 return Poll::Ready(Err(DgramReceiveError::ConnectionNotStarted));
193 }
194 ConnectionState::Connecting => {
195 exclusive.start_waiters.push(cx.waker().clone());
196 return Poll::Pending;
197 }
198 ConnectionState::Connected => {}
199 ConnectionState::Shutdown | ConnectionState::ShutdownComplete => {
200 return Poll::Ready(Err(DgramReceiveError::ConnectionLost(
201 exclusive.error.as_ref().expect("error").clone(),
202 )));
203 }
204 }
205
206 if let Some(buf) = exclusive.recv_buffers.pop_front() {
207 Poll::Ready(Ok(buf))
208 } else {
209 exclusive.recv_waiters.push(cx.waker().clone());
210 Poll::Pending
211 }
212 }
213
214 pub fn poll_send_datagram(
216 &self,
217 cx: &mut Context<'_>,
218 buf: &Bytes,
219 ) -> Poll<Result<(), DgramSendError>> {
220 let mut exclusive = self.0.exclusive.lock().unwrap();
221 match exclusive.state {
222 ConnectionState::Open => {
223 return Poll::Ready(Err(DgramSendError::ConnectionNotStarted));
224 }
225 ConnectionState::Connecting => {
226 exclusive.start_waiters.push(cx.waker().clone());
227 return Poll::Pending;
228 }
229 ConnectionState::Connected => {}
230 ConnectionState::Shutdown | ConnectionState::ShutdownComplete => {
231 return Poll::Ready(Err(DgramSendError::ConnectionLost(
232 exclusive.error.as_ref().expect("error").clone(),
233 )));
234 }
235 }
236
237 let mut write_buf = exclusive.write_pool.pop().unwrap_or(WriteBuffer::new());
238 let _ = write_buf.put_zerocopy(buf);
239 let buffers = unsafe {
240 let (data, len) = write_buf.get_buffers();
241 std::slice::from_raw_parts(data, len)
242 };
243 let res = unsafe {
244 self.0.msquic_conn.datagram_send(
245 buffers,
246 msquic::SendFlags::NONE,
247 write_buf.into_raw() as *const _,
248 )
249 }
250 .map_err(DgramSendError::OtherError);
251 Poll::Ready(res)
252 }
253
254 pub fn send_datagram(&self, buf: &Bytes) -> Result<(), DgramSendError> {
256 let mut exclusive = self.0.exclusive.lock().unwrap();
257 match exclusive.state {
258 ConnectionState::Open => {
259 return Err(DgramSendError::ConnectionNotStarted);
260 }
261 ConnectionState::Connecting => {
262 return Err(DgramSendError::ConnectionNotStarted);
263 }
264 ConnectionState::Connected => {}
265 ConnectionState::Shutdown | ConnectionState::ShutdownComplete => {
266 return Err(DgramSendError::ConnectionLost(
267 exclusive.error.as_ref().expect("error").clone(),
268 ));
269 }
270 }
271
272 if !exclusive.dgram_send_enabled {
273 return Err(DgramSendError::Denied);
274 }
275 if buf.len() > exclusive.dgram_max_send_length as usize {
276 return Err(DgramSendError::TooBig);
277 }
278
279 let mut write_buf = exclusive.write_pool.pop().unwrap_or(WriteBuffer::new());
280 let _ = write_buf.put_zerocopy(buf);
281 let buffers = unsafe {
282 let (data, len) = write_buf.get_buffers();
283 std::slice::from_raw_parts(data, len)
284 };
285 unsafe {
286 self.0.msquic_conn.datagram_send(
287 buffers,
288 msquic::SendFlags::NONE,
289 write_buf.into_raw() as *const _,
290 )
291 }
292 .map_err(DgramSendError::OtherError)?;
293 Ok(())
294 }
295
296 pub fn poll_shutdown(
298 &self,
299 cx: &mut Context<'_>,
300 error_code: u64,
301 ) -> Poll<Result<(), ShutdownError>> {
302 let mut exclusive = self.0.exclusive.lock().unwrap();
303 match exclusive.state {
304 ConnectionState::Open => {
305 return Poll::Ready(Err(ShutdownError::ConnectionNotStarted));
306 }
307 ConnectionState::Connecting => {
308 exclusive.start_waiters.push(cx.waker().clone());
309 return Poll::Pending;
310 }
311 ConnectionState::Connected => {
312 self.0
313 .msquic_conn
314 .shutdown(msquic::ConnectionShutdownFlags::NONE, error_code);
315 exclusive.state = ConnectionState::Shutdown;
316 }
317 ConnectionState::Shutdown => {}
318 ConnectionState::ShutdownComplete => {
319 if let Some(ConnectionError::ShutdownByLocal) = &exclusive.error {
320 return Poll::Ready(Ok(()));
321 } else {
322 return Poll::Ready(Err(ShutdownError::ConnectionLost(
323 exclusive.error.as_ref().expect("error").clone(),
324 )));
325 }
326 }
327 }
328
329 exclusive.shutdown_waiters.push(cx.waker().clone());
330 Poll::Pending
331 }
332
333 pub fn shutdown(&self, error_code: u64) -> Result<(), ShutdownError> {
335 let mut exclusive = self.0.exclusive.lock().unwrap();
336 match exclusive.state {
337 ConnectionState::Open | ConnectionState::Connecting => {
338 return Err(ShutdownError::ConnectionNotStarted);
339 }
340 ConnectionState::Connected => {
341 self.0
342 .msquic_conn
343 .shutdown(msquic::ConnectionShutdownFlags::NONE, error_code);
344 exclusive.state = ConnectionState::Shutdown;
345 }
346 _ => {}
347 }
348 Ok(())
349 }
350
351 pub fn get_local_addr(&self) -> Result<SocketAddr, ConnectionError> {
353 self.0
354 .msquic_conn
355 .get_local_addr()
356 .map(|addr| addr.as_socket().expect("socket addr"))
357 .map_err(ConnectionError::OtherError)
358 }
359
360 pub fn get_remote_addr(&self) -> Result<SocketAddr, ConnectionError> {
362 self.0
363 .msquic_conn
364 .get_remote_addr()
365 .map(|addr| addr.as_socket().expect("socket addr"))
366 .map_err(ConnectionError::OtherError)
367 }
368}
369
370struct ConnectionInstance {
371 inner: Arc<ConnectionInner>,
372 msquic_conn: msquic::Connection,
373}
374
375impl Deref for ConnectionInstance {
376 type Target = ConnectionInner;
377
378 fn deref(&self) -> &Self::Target {
379 &self.inner
380 }
381}
382
383impl Drop for ConnectionInstance {
384 fn drop(&mut self) {
385 trace!("ConnectionInstance({:p}) dropping", self);
386 }
387}
388
389struct ConnectionInner {
390 exclusive: Mutex<ConnectionInnerExclusive>,
391}
392
393struct ConnectionInnerExclusive {
394 state: ConnectionState,
395 error: Option<ConnectionError>,
396 start_waiters: Vec<Waker>,
397 inbound_stream_waiters: Vec<Waker>,
398 inbound_uni_stream_waiters: Vec<Waker>,
399 inbound_streams: VecDeque<crate::stream::Stream>,
400 inbound_uni_streams: VecDeque<crate::stream::ReadStream>,
401 recv_buffers: VecDeque<Bytes>,
402 recv_waiters: Vec<Waker>,
403 write_pool: Vec<WriteBuffer>,
404 dgram_send_enabled: bool,
405 dgram_max_send_length: u16,
406 shutdown_waiters: Vec<Waker>,
407}
408
409impl ConnectionInner {
410 fn new(state: ConnectionState) -> Self {
411 Self {
412 exclusive: Mutex::new(ConnectionInnerExclusive {
413 state,
414 error: None,
415 start_waiters: Vec::new(),
416 inbound_stream_waiters: Vec::new(),
417 inbound_uni_stream_waiters: Vec::new(),
418 inbound_streams: VecDeque::new(),
419 inbound_uni_streams: VecDeque::new(),
420 recv_buffers: VecDeque::new(),
421 recv_waiters: Vec::new(),
422 write_pool: Vec::new(),
423 dgram_send_enabled: false,
424 dgram_max_send_length: 0,
425 shutdown_waiters: Vec::new(),
426 }),
427 }
428 }
429
430 fn handle_event_connected(
431 &self,
432 _session_resumed: bool,
433 _negotiated_alpn: &[u8],
434 ) -> Result<(), msquic::Status> {
435 trace!("ConnectionInner({:p}) Connected", self);
436
437 let mut exclusive = self.exclusive.lock().unwrap();
438 exclusive.state = ConnectionState::Connected;
439 exclusive
440 .start_waiters
441 .drain(..)
442 .for_each(|waker| waker.wake());
443 Ok(())
444 }
445
446 fn handle_event_shutdown_initiated_by_transport(
447 &self,
448 status: msquic::Status,
449 error_code: u64,
450 ) -> Result<(), msquic::Status> {
451 trace!(
452 "ConnectionInner({:p}) Transport shutdown {:?}",
453 self,
454 status
455 );
456
457 let mut exclusive = self.exclusive.lock().unwrap();
458 exclusive.state = ConnectionState::Shutdown;
459 exclusive.error = Some(ConnectionError::ShutdownByTransport(status, error_code));
460 exclusive
461 .start_waiters
462 .drain(..)
463 .for_each(|waker| waker.wake());
464 exclusive
465 .inbound_stream_waiters
466 .drain(..)
467 .for_each(|waker| waker.wake());
468 Ok(())
469 }
470
471 fn handle_event_shutdown_initiated_by_peer(
472 &self,
473 error_code: u64,
474 ) -> Result<(), msquic::Status> {
475 trace!("ConnectionInner({:p}) App shutdown {}", self, error_code);
476
477 let mut exclusive = self.exclusive.lock().unwrap();
478 exclusive.state = ConnectionState::Shutdown;
479 exclusive.error = Some(ConnectionError::ShutdownByPeer(error_code));
480 exclusive
481 .start_waiters
482 .drain(..)
483 .for_each(|waker| waker.wake());
484 exclusive
485 .inbound_stream_waiters
486 .drain(..)
487 .for_each(|waker| waker.wake());
488 Ok(())
489 }
490
491 fn handle_event_shutdown_complete(
492 &self,
493 handshake_completed: bool,
494 peer_acknowledged_shutdown: bool,
495 app_close_in_progress: bool,
496 ) -> Result<(), msquic::Status> {
497 trace!("ConnectionInner({:p}) Shutdown complete: handshake_completed={}, peer_acknowledged_shutdown={}, app_close_in_progress={}",
498 self, handshake_completed, peer_acknowledged_shutdown, app_close_in_progress
499 );
500
501 {
502 let mut exclusive = self.exclusive.lock().unwrap();
503 exclusive.state = ConnectionState::ShutdownComplete;
504 if exclusive.error.is_none() {
505 exclusive.error = Some(ConnectionError::ShutdownByLocal);
506 }
507 exclusive
508 .start_waiters
509 .drain(..)
510 .for_each(|waker| waker.wake());
511 exclusive
512 .inbound_stream_waiters
513 .drain(..)
514 .for_each(|waker| waker.wake());
515 exclusive
516 .shutdown_waiters
517 .drain(..)
518 .for_each(|waker| waker.wake());
519 }
520 Ok(())
521 }
522
523 fn handle_event_peer_stream_started(
524 &self,
525 stream: msquic::StreamRef,
526 flags: msquic::StreamOpenFlags,
527 ) -> Result<(), msquic::Status> {
528 let stream_type = if (flags & msquic::StreamOpenFlags::UNIDIRECTIONAL)
529 == msquic::StreamOpenFlags::UNIDIRECTIONAL
530 {
531 StreamType::Unidirectional
532 } else {
533 StreamType::Bidirectional
534 };
535 trace!(
536 "ConnectionInner({:p}) Peer stream started {:?}",
537 self,
538 stream_type
539 );
540
541 let stream = Stream::from_raw(unsafe { stream.as_raw() }, stream_type);
542 if (flags & msquic::StreamOpenFlags::UNIDIRECTIONAL)
543 == msquic::StreamOpenFlags::UNIDIRECTIONAL
544 {
545 if let (Some(read_stream), None) = stream.split() {
546 let mut exclusive = self.exclusive.lock().unwrap();
547 exclusive.inbound_uni_streams.push_back(read_stream);
548 exclusive
549 .inbound_uni_stream_waiters
550 .drain(..)
551 .for_each(|waker| waker.wake());
552 } else {
553 unreachable!();
554 }
555 } else {
556 {
557 let mut exclusive = self.exclusive.lock().unwrap();
558 exclusive.inbound_streams.push_back(stream);
559 exclusive
560 .inbound_stream_waiters
561 .drain(..)
562 .for_each(|waker| waker.wake());
563 }
564 }
565
566 Ok(())
567 }
568
569 fn handle_event_streams_available(
570 &self,
571 bidirectional_count: u16,
572 unidirectional_count: u16,
573 ) -> Result<(), msquic::Status> {
574 trace!(
575 "ConnectionInner({:p}) Streams available bidirectional_count:{} unidirectional_count:{}",
576 self,
577 bidirectional_count,
578 unidirectional_count
579 );
580 Ok(())
581 }
582
583 fn handle_event_datagram_state_changed(
584 &self,
585 send_enabled: bool,
586 max_send_length: u16,
587 ) -> Result<(), msquic::Status> {
588 trace!(
589 "ConnectionInner({:p}) Datagram state changed send_enabled:{} max_send_length:{}",
590 self,
591 send_enabled,
592 max_send_length
593 );
594 let mut exclusive = self.exclusive.lock().unwrap();
595 exclusive.dgram_send_enabled = send_enabled;
596 exclusive.dgram_max_send_length = max_send_length;
597 Ok(())
598 }
599
600 fn handle_event_datagram_received(
601 &self,
602 buffer: &msquic::BufferRef,
603 _flags: msquic::ReceiveFlags,
604 ) -> Result<(), msquic::Status> {
605 trace!("ConnectionInner({:p}) Datagram received", self);
606 let buf = Bytes::copy_from_slice(buffer.as_bytes());
607 {
608 let mut exclusive = self.exclusive.lock().unwrap();
609 exclusive.recv_buffers.push_back(buf);
610 exclusive
611 .recv_waiters
612 .drain(..)
613 .for_each(|waker| waker.wake());
614 }
615 Ok(())
616 }
617
618 fn handle_event_datagram_send_state_changed(
619 &self,
620 client_context: *const c_void,
621 state: msquic::DatagramSendState,
622 ) -> Result<(), msquic::Status> {
623 trace!(
624 "ConnectionInner({:p}) Datagram send state changed state:{:?}",
625 self,
626 state
627 );
628 match state {
629 msquic::DatagramSendState::Sent | msquic::DatagramSendState::Canceled => {
630 let mut write_buf = unsafe { WriteBuffer::from_raw(client_context) };
631 let mut exclusive = self.exclusive.lock().unwrap();
632 write_buf.reset();
633 exclusive.write_pool.push(write_buf);
634 }
635 _ => {}
636 }
637 Ok(())
638 }
639
640 fn callback_handler_impl(
641 &self,
642 _connection: msquic::ConnectionRef,
643 ev: msquic::ConnectionEvent,
644 ) -> Result<(), msquic::Status> {
645 match ev {
646 msquic::ConnectionEvent::Connected {
647 session_resumed,
648 negotiated_alpn,
649 } => self.handle_event_connected(session_resumed, negotiated_alpn),
650 msquic::ConnectionEvent::ShutdownInitiatedByTransport { status, error_code } => {
651 self.handle_event_shutdown_initiated_by_transport(status, error_code)
652 }
653 msquic::ConnectionEvent::ShutdownInitiatedByPeer { error_code } => {
654 self.handle_event_shutdown_initiated_by_peer(error_code)
655 }
656 msquic::ConnectionEvent::ShutdownComplete {
657 handshake_completed,
658 peer_acknowledged_shutdown,
659 app_close_in_progress,
660 } => self.handle_event_shutdown_complete(
661 handshake_completed,
662 peer_acknowledged_shutdown,
663 app_close_in_progress,
664 ),
665 msquic::ConnectionEvent::PeerStreamStarted { stream, flags } => {
666 self.handle_event_peer_stream_started(stream, flags)
667 }
668 msquic::ConnectionEvent::StreamsAvailable {
669 bidirectional_count,
670 unidirectional_count,
671 } => self.handle_event_streams_available(bidirectional_count, unidirectional_count),
672 msquic::ConnectionEvent::DatagramStateChanged {
673 send_enabled,
674 max_send_length,
675 } => self.handle_event_datagram_state_changed(send_enabled, max_send_length),
676 msquic::ConnectionEvent::DatagramReceived { buffer, flags } => {
677 self.handle_event_datagram_received(buffer, flags)
678 }
679 msquic::ConnectionEvent::DatagramSendStateChanged {
680 client_context,
681 state,
682 } => self.handle_event_datagram_send_state_changed(client_context, state),
683 _ => {
684 trace!("ConnectionInner({:p}) Other callback", self);
685 Ok(())
686 }
687 }
688 }
689}
690impl Drop for ConnectionInner {
691 fn drop(&mut self) {
692 trace!("ConnectionInner({:p}) dropping", self);
693 }
694}
695
696#[derive(Debug, PartialEq)]
697enum ConnectionState {
698 Open,
699 Connecting,
700 Connected,
701 Shutdown,
702 ShutdownComplete,
703}
704
705#[derive(Debug, Error, Clone)]
707pub enum ConnectionError {
708 #[error("connection shutdown by transport: status {0:?}, error 0x{1:x}")]
709 ShutdownByTransport(msquic::Status, u64),
710 #[error("connection shutdown by peer: error 0x{0:x}")]
711 ShutdownByPeer(u64),
712 #[error("connection shutdown by local")]
713 ShutdownByLocal,
714 #[error("connection closed")]
715 ConnectionClosed,
716 #[error("other error: status {0:?}")]
717 OtherError(msquic::Status),
718}
719
720#[derive(Debug, Error, Clone)]
722pub enum DgramReceiveError {
723 #[error("connection not started yet")]
724 ConnectionNotStarted,
725 #[error("connection lost")]
726 ConnectionLost(#[from] ConnectionError),
727 #[error("other error: status {0:?}")]
728 OtherError(msquic::Status),
729}
730
731#[derive(Debug, Error, Clone)]
733pub enum DgramSendError {
734 #[error("connection not started yet")]
735 ConnectionNotStarted,
736 #[error("not allowed for sending dgram")]
737 Denied,
738 #[error("exceeded maximum data size for sending dgram")]
739 TooBig,
740 #[error("connection lost")]
741 ConnectionLost(#[from] ConnectionError),
742 #[error("other error: status {0:?}")]
743 OtherError(msquic::Status),
744}
745
746#[derive(Debug, Error, Clone)]
748pub enum StartError {
749 #[error("connection lost")]
750 ConnectionLost(#[from] ConnectionError),
751 #[error("other error: status {0:?}")]
752 OtherError(msquic::Status),
753}
754
755#[derive(Debug, Error, Clone)]
757pub enum ShutdownError {
758 #[error("connection not started yet")]
759 ConnectionNotStarted,
760 #[error("connection lost")]
761 ConnectionLost(#[from] ConnectionError),
762 #[error("other error: status {0:?}")]
763 OtherError(msquic::Status),
764}
765
766pub struct ConnectionStart<'a> {
768 conn: &'a Connection,
769 configuration: &'a msquic::Configuration,
770 host: &'a str,
771 port: u16,
772}
773
774impl Future for ConnectionStart<'_> {
775 type Output = Result<(), StartError>;
776
777 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
778 self.conn
779 .poll_start(cx, self.configuration, self.host, self.port)
780 }
781}
782
783pub struct OpenOutboundStream<'a> {
785 conn: &'a ConnectionInstance,
786 stream_type: Option<crate::stream::StreamType>,
787 stream: Option<crate::stream::Stream>,
788 fail_on_blocked: bool,
789}
790
791impl Future for OpenOutboundStream<'_> {
792 type Output = Result<crate::stream::Stream, StreamStartError>;
793
794 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
795 let this = self.get_mut();
796 let OpenOutboundStream {
797 conn,
798 ref mut stream_type,
799 ref mut stream,
800 fail_on_blocked: fail_blocked,
801 ..
802 } = *this;
803
804 let mut exclusive = conn.inner.exclusive.lock().unwrap();
805 match exclusive.state {
806 ConnectionState::Open => {
807 return Poll::Ready(Err(StreamStartError::ConnectionNotStarted));
808 }
809 ConnectionState::Connecting => {
810 exclusive.start_waiters.push(cx.waker().clone());
811 return Poll::Pending;
812 }
813 ConnectionState::Connected => {}
814 ConnectionState::Shutdown | ConnectionState::ShutdownComplete => {
815 return Poll::Ready(Err(StreamStartError::ConnectionLost(
816 exclusive.error.as_ref().expect("error").clone(),
817 )));
818 }
819 }
820 if stream.is_none() {
821 match Stream::open(&conn.msquic_conn, stream_type.take().unwrap()) {
822 Ok(new_stream) => {
823 *stream = Some(new_stream);
824 }
825 Err(e) => return Poll::Ready(Err(e)),
826 }
827 }
828 stream
829 .as_mut()
830 .unwrap()
831 .poll_start(cx, fail_blocked)
832 .map(|res| res.map(|_| stream.take().unwrap()))
833 }
834}
835
836pub struct AcceptInboundStream<'a> {
838 conn: &'a Connection,
839}
840
841impl Future for AcceptInboundStream<'_> {
842 type Output = Result<Stream, StreamStartError>;
843
844 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
845 self.conn.poll_accept_inbound_stream(cx)
846 }
847}
848
849pub struct AcceptInboundUniStream<'a> {
851 conn: &'a Connection,
852}
853
854impl Future for AcceptInboundUniStream<'_> {
855 type Output = Result<ReadStream, StreamStartError>;
856
857 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
858 self.conn.poll_accept_inbound_uni_stream(cx)
859 }
860}