1use crate::buffer::{StreamRecvBuffer, WriteBuffer};
2use crate::connection::ConnectionError;
3
4#[cfg(feature = "msquic-2-5")]
5use msquic_v2_5 as msquic;
6#[cfg(feature = "msquic-seera")]
7use seera_msquic as msquic;
8
9use std::collections::VecDeque;
10use std::fmt;
11use std::future::Future;
12use std::pin::Pin;
13use std::sync::{Arc, Mutex, RwLock};
14use std::task::{ready, Context, Poll, Waker};
15
16use bytes::Bytes;
17use libc::c_void;
18use rangemap::RangeSet;
19use thiserror::Error;
20use tracing::trace;
21
22#[derive(Debug, Clone, Copy, PartialEq)]
23pub enum StreamType {
24 Bidirectional,
25 Unidirectional,
26}
27
28#[derive(Debug)]
30pub struct Stream(Arc<StreamInstance>);
31
32impl Stream {
33 pub(crate) fn open(
34 msquic_conn: &msquic::Connection,
35 stream_type: StreamType,
36 ) -> Result<Self, StartError> {
37 let flags = if stream_type == StreamType::Unidirectional {
38 msquic::StreamOpenFlags::UNIDIRECTIONAL
39 } else {
40 msquic::StreamOpenFlags::NONE
41 };
42 let inner = Arc::new(StreamInner::new(
43 stream_type,
44 StreamSendState::Closed,
45 StreamRecvState::Closed,
46 true,
47 ));
48 let inner_in_ev = inner.clone();
49 let msquic_stream = msquic::Stream::open(msquic_conn, flags, move |stream_ref, ev| {
50 inner_in_ev.callback_handler_impl(stream_ref, ev)
51 })
52 .map_err(StartError::OtherError)?;
53 let stream_handle = unsafe { msquic_stream.as_raw() };
54 let instance = Arc::new(StreamInstance {
55 inner,
56 msquic_stream,
57 });
58 trace!(
59 "StreamInstance({:p}, Inner: {:p}, HQUIC: {:p}) Open by local",
60 instance,
61 instance.inner,
62 stream_handle
63 );
64
65 Ok(Self(instance))
66 }
67
68 pub(crate) fn from_raw(handle: msquic::ffi::HQUIC, stream_type: StreamType) -> Self {
69 let msquic_stream = unsafe { msquic::Stream::from_raw(handle) };
70 let send_state = if stream_type == StreamType::Bidirectional {
71 StreamSendState::StartComplete
72 } else {
73 StreamSendState::Closed
74 };
75 let inner = Arc::new(StreamInner::new(
76 stream_type,
77 send_state,
78 StreamRecvState::StartComplete,
79 false,
80 ));
81 let inner_in_ev = inner.clone();
82 msquic_stream.set_callback_handler(move |stream_ref, ev| {
83 inner_in_ev.callback_handler_impl(stream_ref, ev)
84 });
85 let stream_handle = unsafe { msquic_stream.as_raw() };
86 let stream = Self(Arc::new(StreamInstance {
87 inner,
88 msquic_stream,
89 }));
90 trace!(
91 "StreamInstance({:p}, Inner: {:p}, HQUIC: {:p}, id: {:?}) Start by peer",
92 stream.0,
93 stream.0.inner,
94 stream_handle,
95 stream.id()
96 );
97 stream
98 }
99
100 pub(crate) fn poll_start(
101 &mut self,
102 cx: &mut Context,
103 failed_on_block: bool,
104 ) -> Poll<Result<(), StartError>> {
105 let mut exclusive = self.0.inner.exclusive.lock().unwrap();
106 trace!(
107 "Stream(Inner: {:p}) poll_start state={:?}",
108 self.0.inner,
109 exclusive.state
110 );
111 match exclusive.state {
112 StreamState::Open => {
113 let res = self
114 .0
115 .msquic_stream
116 .start(
117 msquic::StreamStartFlags::SHUTDOWN_ON_FAIL
118 | msquic::StreamStartFlags::INDICATE_PEER_ACCEPT
119 | if failed_on_block {
120 msquic::StreamStartFlags::FAIL_BLOCKED
121 } else {
122 msquic::StreamStartFlags::NONE
123 },
124 )
125 .map_err(StartError::OtherError);
126 trace!(
127 "Stream(Inner: {:p}) poll_start start={:?}",
128 self.0.inner,
129 res
130 );
131 res?;
132 exclusive.state = StreamState::Start;
133 if self.0.inner.shared.stream_type == StreamType::Bidirectional {
134 exclusive.recv_state = StreamRecvState::Start;
135 }
136 exclusive.send_state = StreamSendState::Start;
137 }
138 StreamState::Start => {}
139 _ => {
140 if let Some(start_status) = &exclusive.start_status {
141 if start_status.is_ok() {
142 return Poll::Ready(Ok(()));
143 }
144 return Poll::Ready(Err(match start_status.try_as_status_code().unwrap() {
145 msquic::StatusCode::QUIC_STATUS_STREAM_LIMIT_REACHED => {
146 StartError::LimitReached
147 }
148 msquic::StatusCode::QUIC_STATUS_ABORTED
149 | msquic::StatusCode::QUIC_STATUS_INVALID_STATE => {
150 StartError::ConnectionLost(
151 exclusive.conn_error.as_ref().expect("conn_error").clone(),
152 )
153 }
154 _ => StartError::OtherError(start_status.clone()),
155 }));
156 } else {
157 return Poll::Ready(Ok(()));
158 }
159 }
160 }
161 exclusive.start_waiters.push(cx.waker().clone());
162 Poll::Pending
163 }
164
165 pub fn id(&self) -> Option<u64> {
167 self.0.id()
168 }
169
170 pub fn split(self) -> (Option<ReadStream>, Option<WriteStream>) {
172 match (
173 self.0.inner.shared.stream_type,
174 self.0.inner.shared.local_open,
175 ) {
176 (StreamType::Unidirectional, true) => (None, Some(WriteStream(self.0))),
177 (StreamType::Unidirectional, false) => (Some(ReadStream(self.0)), None),
178 (StreamType::Bidirectional, _) => {
179 (Some(ReadStream(self.0.clone())), Some(WriteStream(self.0)))
180 }
181 }
182 }
183
184 pub fn poll_read(
186 &mut self,
187 cx: &mut Context<'_>,
188 buf: &mut [u8],
189 ) -> Poll<Result<usize, ReadError>> {
190 self.0.poll_read(cx, buf)
191 }
192
193 pub fn poll_read_chunk(
195 &self,
196 cx: &mut Context<'_>,
197 ) -> Poll<Result<Option<StreamRecvBuffer>, ReadError>> {
198 self.0.poll_read_chunk(cx)
199 }
200
201 pub fn read_chunk(&self) -> ReadChunk<'_> {
203 self.0.read_chunk()
204 }
205
206 pub fn poll_write(
208 &mut self,
209 cx: &mut Context<'_>,
210 buf: &[u8],
211 fin: bool,
212 ) -> Poll<Result<usize, WriteError>> {
213 self.0.poll_write(cx, buf, fin)
214 }
215
216 pub fn poll_write_chunk(
218 &mut self,
219 cx: &mut Context<'_>,
220 chunk: &Bytes,
221 fin: bool,
222 ) -> Poll<Result<usize, WriteError>> {
223 self.0.poll_write_chunk(cx, chunk, fin)
224 }
225
226 pub fn write_chunk<'a>(&'a mut self, chunk: &'a Bytes, fin: bool) -> WriteChunk<'a> {
228 self.0.write_chunk(chunk, fin)
229 }
230
231 pub fn poll_write_chunks(
233 &mut self,
234 cx: &mut Context<'_>,
235 chunks: &[Bytes],
236 fin: bool,
237 ) -> Poll<Result<usize, WriteError>> {
238 self.0.poll_write_chunks(cx, chunks, fin)
239 }
240
241 pub fn write_chunks<'a>(&'a mut self, chunks: &'a [Bytes], fin: bool) -> WriteChunks<'a> {
243 self.0.write_chunks(chunks, fin)
244 }
245
246 pub fn poll_finish_write(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), WriteError>> {
248 self.0.poll_finish_write(cx)
249 }
250
251 pub fn poll_abort_write(
253 &mut self,
254 cx: &mut Context<'_>,
255 error_code: u64,
256 ) -> Poll<Result<(), WriteError>> {
257 self.0.poll_abort_write(cx, error_code)
258 }
259
260 pub fn abort_write(&mut self, error_code: u64) -> Result<(), WriteError> {
262 self.0.abort_write(error_code)
263 }
264
265 pub fn poll_abort_read(
267 &mut self,
268 cx: &mut Context<'_>,
269 error_code: u64,
270 ) -> Poll<Result<(), ReadError>> {
271 self.0.poll_abort_read(cx, error_code)
272 }
273
274 pub fn abort_read(&mut self, error_code: u64) -> Result<(), ReadError> {
276 self.0.abort_read(error_code)
277 }
278}
279
280#[derive(Debug)]
282pub struct ReadStream(Arc<StreamInstance>);
283
284impl ReadStream {
285 pub fn id(&self) -> Option<u64> {
287 self.0.id()
288 }
289
290 pub fn poll_read(
292 &mut self,
293 cx: &mut Context<'_>,
294 buf: &mut [u8],
295 ) -> Poll<Result<usize, ReadError>> {
296 self.0.poll_read(cx, buf)
297 }
298
299 pub fn poll_read_chunk(
301 &self,
302 cx: &mut Context<'_>,
303 ) -> Poll<Result<Option<StreamRecvBuffer>, ReadError>> {
304 self.0.poll_read_chunk(cx)
305 }
306
307 pub fn read_chunk(&self) -> ReadChunk<'_> {
309 self.0.read_chunk()
310 }
311
312 pub fn poll_abort_read(
314 &mut self,
315 cx: &mut Context<'_>,
316 error_code: u64,
317 ) -> Poll<Result<(), ReadError>> {
318 self.0.poll_abort_read(cx, error_code)
319 }
320
321 pub fn abort_read(&mut self, error_code: u64) -> Result<(), ReadError> {
323 self.0.abort_read(error_code)
324 }
325}
326
327#[derive(Debug)]
329pub struct WriteStream(Arc<StreamInstance>);
330
331impl WriteStream {
332 pub fn id(&self) -> Option<u64> {
334 self.0.id()
335 }
336
337 pub fn poll_write(
339 &mut self,
340 cx: &mut Context<'_>,
341 buf: &[u8],
342 fin: bool,
343 ) -> Poll<Result<usize, WriteError>> {
344 self.0.poll_write(cx, buf, fin)
345 }
346
347 pub fn poll_write_chunk(
349 &mut self,
350 cx: &mut Context<'_>,
351 chunk: &Bytes,
352 fin: bool,
353 ) -> Poll<Result<usize, WriteError>> {
354 self.0.poll_write_chunk(cx, chunk, fin)
355 }
356
357 pub fn write_chunk<'a>(&'a mut self, chunk: &'a Bytes, fin: bool) -> WriteChunk<'a> {
359 self.0.write_chunk(chunk, fin)
360 }
361
362 pub fn poll_write_chunks(
364 &mut self,
365 cx: &mut Context<'_>,
366 chunks: &[Bytes],
367 fin: bool,
368 ) -> Poll<Result<usize, WriteError>> {
369 self.0.poll_write_chunks(cx, chunks, fin)
370 }
371
372 pub fn write_chunks<'a>(&'a mut self, chunks: &'a [Bytes], fin: bool) -> WriteChunks<'a> {
374 self.0.write_chunks(chunks, fin)
375 }
376
377 pub fn poll_finish_write(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), WriteError>> {
379 self.0.poll_finish_write(cx)
380 }
381
382 pub fn poll_abort_write(
384 &mut self,
385 cx: &mut Context<'_>,
386 error_code: u64,
387 ) -> Poll<Result<(), WriteError>> {
388 self.0.poll_abort_write(cx, error_code)
389 }
390
391 pub fn abort_write(&mut self, error_code: u64) -> Result<(), WriteError> {
393 self.0.abort_write(error_code)
394 }
395}
396
397#[derive(Debug)]
398pub(crate) struct StreamInstance {
399 inner: Arc<StreamInner>,
400 msquic_stream: msquic::Stream,
401}
402
403impl StreamInstance {
404 pub(crate) fn id(&self) -> Option<u64> {
405 let id = { *self.inner.shared.id.read().unwrap() };
406 if id.is_some() {
407 id
408 } else {
409 let res = unsafe {
410 msquic::Api::get_param_auto::<u64>(
411 self.msquic_stream.as_raw(),
412 msquic::PARAM_STREAM_ID,
413 )
414 };
415 if let Ok(id) = res {
416 self.inner.shared.id.write().unwrap().replace(id);
417 Some(id)
418 } else {
419 None
420 }
421 }
422 }
423
424 fn poll_read(
425 self: &Arc<Self>,
426 cx: &mut Context<'_>,
427 buf: &mut [u8],
428 ) -> Poll<Result<usize, ReadError>> {
429 self.poll_read_generic(cx, |recv_buffers, read_complete_buffers| {
430 let mut read = 0;
431 let mut fin = false;
432 loop {
433 if read == buf.len() {
434 return ReadStatus::Readable(read);
435 }
436
437 match recv_buffers
438 .front_mut()
439 .and_then(|x| x.get_bytes_upto_size(buf.len() - read))
440 {
441 Some(slice) => {
442 let len = slice.len();
443 buf[read..read + len].copy_from_slice(slice);
444 read += len;
445 }
446 None => {
447 if let Some(mut recv_buffer) = recv_buffers.pop_front() {
448 recv_buffer.set_stream(self.clone());
449 fin = recv_buffer.fin();
450 read_complete_buffers.push(recv_buffer);
451 continue;
452 } else {
453 return (if read > 0 { Some(read) } else { None }, fin).into();
454 }
455 }
456 }
457 }
458 })
459 .map(|res| res.map(|x| x.unwrap_or(0)))
460 }
461
462 fn poll_read_chunk(
463 self: &Arc<Self>,
464 cx: &mut Context<'_>,
465 ) -> Poll<Result<Option<StreamRecvBuffer>, ReadError>> {
466 self.poll_read_generic(cx, |recv_buffers, _| {
467 recv_buffers
468 .pop_front()
469 .map(|mut recv_buffer| {
470 let fin = recv_buffer.fin();
471 recv_buffer.set_stream(self.clone());
472 (Some(recv_buffer), fin)
473 })
474 .unwrap_or((None, false))
475 .into()
476 })
477 }
478
479 fn read_chunk(self: &Arc<Self>) -> ReadChunk<'_> {
480 ReadChunk { stream: self }
481 }
482
483 fn poll_read_generic<T, U>(
484 &self,
485 cx: &mut Context<'_>,
486 mut read_fn: T,
487 ) -> Poll<Result<Option<U>, ReadError>>
488 where
489 T: FnMut(&mut VecDeque<StreamRecvBuffer>, &mut Vec<StreamRecvBuffer>) -> ReadStatus<U>,
490 {
491 let res;
492 let mut read_complete_buffers = Vec::new();
493 {
494 let mut exclusive = self.inner.exclusive.lock().unwrap();
495 match exclusive.recv_state {
496 StreamRecvState::Closed => {
497 return Poll::Ready(Err(ReadError::Closed));
498 }
499 StreamRecvState::Start => {
500 exclusive.start_waiters.push(cx.waker().clone());
501 return Poll::Pending;
502 }
503 StreamRecvState::StartComplete => {}
504 StreamRecvState::Shutdown => {
505 return Poll::Ready(Ok(None));
506 }
507 StreamRecvState::ShutdownComplete => {
508 if let Some(conn_error) = &exclusive.conn_error {
509 return Poll::Ready(Err(ReadError::ConnectionLost(conn_error.clone())));
510 } else if let Some(error_code) = &exclusive.recv_error_code {
511 return Poll::Ready(Err(ReadError::Reset(*error_code)));
512 } else {
513 return Poll::Ready(Ok(None));
514 }
515 }
516 }
517
518 let status = read_fn(&mut exclusive.recv_buffers, &mut read_complete_buffers);
519
520 res = match status {
521 ReadStatus::Readable(read) | ReadStatus::Blocked(Some(read)) => {
522 Poll::Ready(Ok(Some(read)))
523 }
524 ReadStatus::Finished(read) => {
525 exclusive.recv_state = StreamRecvState::Shutdown;
526 Poll::Ready(Ok(read))
527 }
528 ReadStatus::Blocked(None) => {
529 exclusive.read_waiters.push(cx.waker().clone());
530 Poll::Pending
531 }
532 };
533 }
534 res
535 }
536
537 fn poll_write(
538 &self,
539 cx: &mut Context<'_>,
540 buf: &[u8],
541 fin: bool,
542 ) -> Poll<Result<usize, WriteError>> {
543 self.poll_write_generic(cx, |write_buf| {
544 let written = write_buf.put_slice(buf);
545 if written == buf.len() && !fin {
546 WriteStatus::Writable(written)
547 } else {
548 (Some(written), fin).into()
549 }
550 })
551 .map(|res| res.map(|x| x.unwrap_or(0)))
552 }
553
554 fn poll_write_chunk(
555 &self,
556 cx: &mut Context<'_>,
557 chunk: &Bytes,
558 fin: bool,
559 ) -> Poll<Result<usize, WriteError>> {
560 self.poll_write_generic(cx, |write_buf| {
561 let written = write_buf.put_zerocopy(chunk);
562 if written == chunk.len() && !fin {
563 WriteStatus::Writable(written)
564 } else {
565 (Some(written), fin).into()
566 }
567 })
568 .map(|res| res.map(|x| x.unwrap_or(0)))
569 }
570
571 fn write_chunk<'a>(&'a self, chunk: &'a Bytes, fin: bool) -> WriteChunk<'a> {
572 WriteChunk {
573 stream: self,
574 chunk,
575 fin,
576 }
577 }
578
579 fn poll_write_chunks(
580 &self,
581 cx: &mut Context<'_>,
582 chunks: &[Bytes],
583 fin: bool,
584 ) -> Poll<Result<usize, WriteError>> {
585 self.poll_write_generic(cx, |write_buf| {
586 let (mut total_len, mut total_written) = (0, 0);
587 for buf in chunks {
588 total_len += buf.len();
589 total_written += write_buf.put_zerocopy(buf);
590 }
591 if total_written == total_len && !fin {
592 WriteStatus::Writable(total_written)
593 } else {
594 (Some(total_written), fin).into()
595 }
596 })
597 .map(|res| res.map(|x| x.unwrap_or(0)))
598 }
599
600 fn write_chunks<'a>(&'a self, chunks: &'a [Bytes], fin: bool) -> WriteChunks<'a> {
601 WriteChunks {
602 stream: self,
603 chunks,
604 fin,
605 }
606 }
607
608 fn poll_write_generic<T, U>(
609 &self,
610 _cx: &mut Context<'_>,
611 mut write_fn: T,
612 ) -> Poll<Result<Option<U>, WriteError>>
613 where
614 T: FnMut(&mut WriteBuffer) -> WriteStatus<U>,
615 {
616 let mut exclusive = self.inner.exclusive.lock().unwrap();
617 match exclusive.send_state {
618 StreamSendState::Closed => {
619 return Poll::Ready(Err(WriteError::Closed));
620 }
621 StreamSendState::Start => {
622 exclusive.start_waiters.push(_cx.waker().clone());
623 return Poll::Pending;
624 }
625 StreamSendState::StartComplete => {}
626 StreamSendState::Shutdown => {
627 return Poll::Ready(Err(WriteError::Finished));
628 }
629 StreamSendState::ShutdownComplete => {
630 if let Some(conn_error) = &exclusive.conn_error {
631 return Poll::Ready(Err(WriteError::ConnectionLost(conn_error.clone())));
632 } else if let Some(error_code) = &exclusive.send_error_code {
633 return Poll::Ready(Err(WriteError::Stopped(*error_code)));
634 } else {
635 return Poll::Ready(Err(WriteError::Finished));
636 }
637 }
638 }
639 let mut write_buf = exclusive.write_pool.pop().unwrap_or(WriteBuffer::new());
640 let status = write_fn(&mut write_buf);
641 let buffers = unsafe {
642 let (data, len) = write_buf.get_buffers();
643 std::slice::from_raw_parts(data, len)
644 };
645 match status {
646 WriteStatus::Writable(val) | WriteStatus::Blocked(Some(val)) => {
647 match unsafe {
648 self.msquic_stream.send(
649 buffers,
650 msquic::SendFlags::NONE,
651 write_buf.into_raw() as *const _,
652 )
653 }
654 .map_err(WriteError::OtherError)
655 {
656 Ok(()) => Poll::Ready(Ok(Some(val))),
657 Err(e) => Poll::Ready(Err(e)),
658 }
659 }
660 WriteStatus::Blocked(None) => unreachable!(),
661 WriteStatus::Finished(val) => {
662 match unsafe {
663 self.msquic_stream.send(
664 buffers,
665 msquic::SendFlags::FIN,
666 write_buf.into_raw() as *const _,
667 )
668 }
669 .map_err(WriteError::OtherError)
670 {
671 Ok(()) => {
672 exclusive.send_state = StreamSendState::Shutdown;
673 Poll::Ready(Ok(val))
674 }
675 Err(e) => Poll::Ready(Err(e)),
676 }
677 }
678 }
679 }
680
681 fn poll_finish_write(&self, cx: &mut Context<'_>) -> Poll<Result<(), WriteError>> {
682 let mut exclusive = self.inner.exclusive.lock().unwrap();
683 match exclusive.send_state {
684 StreamSendState::Start => {
685 exclusive.start_waiters.push(cx.waker().clone());
686 return Poll::Pending;
687 }
688 StreamSendState::StartComplete => {
689 match self
690 .msquic_stream
691 .shutdown(msquic::StreamShutdownFlags::GRACEFUL, 0)
692 .map_err(WriteError::OtherError)
693 {
694 Ok(()) => {
695 exclusive.send_state = StreamSendState::Shutdown;
696 }
697 Err(e) => return Poll::Ready(Err(e)),
698 }
699 }
700 StreamSendState::Shutdown => {}
701 StreamSendState::ShutdownComplete => {
702 if let Some(conn_error) = &exclusive.conn_error {
703 return Poll::Ready(Err(WriteError::ConnectionLost(conn_error.clone())));
704 } else if let Some(error_code) = &exclusive.send_error_code {
705 return Poll::Ready(Err(WriteError::Stopped(*error_code)));
706 } else {
707 return Poll::Ready(Ok(()));
708 }
709 }
710 _ => {
711 return Poll::Ready(Err(WriteError::Closed));
712 }
713 }
714 exclusive.write_shutdown_waiters.push(cx.waker().clone());
715 Poll::Pending
716 }
717
718 fn poll_abort_write(
719 &self,
720 cx: &mut Context<'_>,
721 error_code: u64,
722 ) -> Poll<Result<(), WriteError>> {
723 let mut exclusive = self.inner.exclusive.lock().unwrap();
724 match exclusive.send_state {
725 StreamSendState::Start => {
726 exclusive.start_waiters.push(cx.waker().clone());
727 return Poll::Pending;
728 }
729 StreamSendState::StartComplete => {
730 match self
731 .msquic_stream
732 .shutdown(msquic::StreamShutdownFlags::ABORT_SEND, error_code)
733 .map_err(WriteError::OtherError)
734 {
735 Ok(()) => {
736 exclusive.send_state = StreamSendState::Shutdown;
737 }
738 Err(e) => return Poll::Ready(Err(e)),
739 }
740 }
741 StreamSendState::Shutdown => {}
742 StreamSendState::ShutdownComplete => {
743 if let Some(conn_error) = &exclusive.conn_error {
744 return Poll::Ready(Err(WriteError::ConnectionLost(conn_error.clone())));
745 } else if let Some(error_code) = &exclusive.send_error_code {
746 return Poll::Ready(Err(WriteError::Stopped(*error_code)));
747 } else {
748 return Poll::Ready(Ok(()));
749 }
750 }
751 _ => {
752 return Poll::Ready(Err(WriteError::Closed));
753 }
754 }
755 exclusive.write_shutdown_waiters.push(cx.waker().clone());
756 Poll::Pending
757 }
758
759 fn abort_write(&self, error_code: u64) -> Result<(), WriteError> {
760 let mut exclusive = self.inner.exclusive.lock().unwrap();
761 match exclusive.send_state {
762 StreamSendState::StartComplete => {
763 self.msquic_stream
764 .shutdown(msquic::StreamShutdownFlags::ABORT_SEND, error_code)
765 .map_err(WriteError::OtherError)?;
766 exclusive.send_state = StreamSendState::Shutdown;
767 Ok(())
768 }
769 _ => Err(WriteError::Closed),
770 }
771 }
772
773 fn poll_abort_read(
774 &self,
775 cx: &mut Context<'_>,
776 error_code: u64,
777 ) -> Poll<Result<(), ReadError>> {
778 let mut exclusive = self.inner.exclusive.lock().unwrap();
779 match exclusive.recv_state {
780 StreamRecvState::Start => {
781 exclusive.start_waiters.push(cx.waker().clone());
782 Poll::Pending
783 }
784 StreamRecvState::StartComplete => {
785 match self
786 .msquic_stream
787 .shutdown(msquic::StreamShutdownFlags::ABORT_RECEIVE, error_code)
788 .map_err(ReadError::OtherError)
789 {
790 Ok(()) => {
791 exclusive.recv_state = StreamRecvState::ShutdownComplete;
792 exclusive
793 .read_waiters
794 .drain(..)
795 .for_each(|waker| waker.wake());
796 Poll::Ready(Ok(()))
797 }
798 Err(e) => Poll::Ready(Err(e)),
799 }
800 }
801 StreamRecvState::ShutdownComplete => {
802 if let Some(conn_error) = &exclusive.conn_error {
803 Poll::Ready(Err(ReadError::ConnectionLost(conn_error.clone())))
804 } else if let Some(error_code) = &exclusive.recv_error_code {
805 Poll::Ready(Err(ReadError::Reset(*error_code)))
806 } else {
807 Poll::Ready(Ok(()))
808 }
809 }
810 _ => Poll::Ready(Err(ReadError::Closed)),
811 }
812 }
813
814 fn abort_read(&self, error_code: u64) -> Result<(), ReadError> {
815 let mut exclusive = self.inner.exclusive.lock().unwrap();
816 match exclusive.recv_state {
817 StreamRecvState::StartComplete => {
818 self.msquic_stream
819 .shutdown(msquic::StreamShutdownFlags::ABORT_RECEIVE, error_code)
820 .map_err(ReadError::OtherError)?;
821 exclusive.recv_state = StreamRecvState::ShutdownComplete;
822 }
823 _ => {
824 return Err(ReadError::Closed);
825 }
826 }
827 Ok(())
828 }
829
830 pub(crate) fn read_complete(&self, buffer: &StreamRecvBuffer) {
831 let buffer_range = buffer.range();
832 trace!(
833 "StreamInstance({:p}) read complete offset={} len={}",
834 self,
835 buffer_range.start,
836 buffer_range.end - buffer_range.start
837 );
838
839 let mut exclusive = self.inner.exclusive.lock().unwrap();
840 if !buffer_range.is_empty() {
841 exclusive.read_complete_map.insert(buffer_range);
842 }
843 let complete_len = if let Some(complete_range) = exclusive.read_complete_map.first() {
844 trace!(
845 "StreamInstance({:p}) complete read offset={} len={}",
846 self,
847 complete_range.start,
848 complete_range.end - complete_range.start
849 );
850
851 if complete_range.start == 0 && exclusive.read_complete_cursor < complete_range.end {
852 let complete_len = complete_range.end - exclusive.read_complete_cursor;
853 exclusive.read_complete_cursor = complete_range.end;
854 Some(complete_len)
855 } else if complete_range.start == 0
856 && exclusive.read_complete_cursor == complete_range.end
857 && buffer.offset() == complete_range.end
858 && buffer.is_empty()
859 && buffer.fin()
860 {
861 Some(0)
862 } else {
863 None
864 }
865 } else if buffer.is_empty() && buffer.fin() {
866 Some(0)
867 } else {
868 None
869 };
870 if let Some(complete_len) = complete_len {
871 trace!(
872 "StreamInstance({:p}) call receive_complete len={}",
873 self,
874 complete_len
875 );
876 self.msquic_stream.receive_complete(complete_len as u64);
877 }
878 }
879}
880
881impl Drop for StreamInstance {
882 fn drop(&mut self) {
883 trace!("StreamInstance({:p}) dropping", self);
884 let exclusive = self.inner.exclusive.lock().unwrap();
885 match exclusive.state {
886 StreamState::Start | StreamState::StartComplete => {
887 trace!(
888 "StreamInstance(Inner: {:p}) shutdown while dropping",
889 self.inner
890 );
891 }
898 _ => {}
899 }
900 }
901}
902
903#[derive(Debug)]
904struct StreamInner {
905 exclusive: Mutex<StreamInnerExclusive>,
906 pub(crate) shared: StreamInnerShared,
907}
908
909struct StreamInnerExclusive {
910 state: StreamState,
911 start_status: Option<msquic::Status>,
912 recv_state: StreamRecvState,
913 recv_buffers: VecDeque<StreamRecvBuffer>,
914 recv_len: usize,
915 read_complete_map: RangeSet<usize>,
916 read_complete_cursor: usize,
917 send_state: StreamSendState,
918 write_pool: Vec<WriteBuffer>,
919 recv_error_code: Option<u64>,
920 send_error_code: Option<u64>,
921 conn_error: Option<ConnectionError>,
922 start_waiters: Vec<Waker>,
923 read_waiters: Vec<Waker>,
924 write_shutdown_waiters: Vec<Waker>,
925}
926
927struct StreamInnerShared {
928 stream_type: StreamType,
929 local_open: bool,
930 id: RwLock<Option<u64>>,
931}
932
933#[derive(Debug, PartialEq)]
934enum StreamState {
935 Open,
936 Start,
937 StartComplete,
938 ShutdownComplete,
939}
940
941#[derive(Debug, PartialEq)]
942enum StreamRecvState {
943 Closed,
944 Start,
945 StartComplete,
946 Shutdown,
947 ShutdownComplete,
948}
949
950#[derive(Debug, PartialEq)]
951enum StreamSendState {
952 Closed,
953 Start,
954 StartComplete,
955 Shutdown,
956 ShutdownComplete,
957}
958
959impl StreamInner {
960 fn new(
961 stream_type: StreamType,
962 send_state: StreamSendState,
963 recv_state: StreamRecvState,
964 local_open: bool,
965 ) -> Self {
966 Self {
967 exclusive: Mutex::new(StreamInnerExclusive {
968 state: StreamState::Open,
969 start_status: None,
970 recv_state,
971 recv_buffers: VecDeque::new(),
972 recv_len: 0,
973 read_complete_map: RangeSet::new(),
974 read_complete_cursor: 0,
975 send_state,
976 write_pool: Vec::new(),
977 recv_error_code: None,
978 send_error_code: None,
979 conn_error: None,
980 start_waiters: Vec::new(),
981 read_waiters: Vec::new(),
982 write_shutdown_waiters: Vec::new(),
983 }),
984 shared: StreamInnerShared {
985 local_open,
986 id: RwLock::new(None),
987 stream_type,
988 },
989 }
990 }
991
992 fn handle_event_start_complete(
993 &self,
994 status: msquic::Status,
995 id: u64,
996 peer_accepted: bool,
997 ) -> Result<(), msquic::Status> {
998 if status.is_ok() {
999 self.shared.id.write().unwrap().replace(id);
1000 }
1001 trace!(
1002 "StreamInner({:p}, id={:?}) start complete status={:?}, peer_accepted={}, id={}",
1003 self,
1004 self.shared.id.read(),
1005 status,
1006 peer_accepted,
1007 id,
1008 );
1009 let mut exclusive = self.exclusive.lock().unwrap();
1010 exclusive.start_status = Some(status.clone());
1011 if status.is_ok() && peer_accepted {
1012 exclusive.state = StreamState::StartComplete;
1013 if self.shared.stream_type == StreamType::Bidirectional {
1014 exclusive.recv_state = StreamRecvState::StartComplete;
1015 }
1016 exclusive.send_state = StreamSendState::StartComplete;
1017 }
1018
1019 if status.0 == msquic::StatusCode::QUIC_STATUS_STREAM_LIMIT_REACHED.into() || peer_accepted
1020 {
1021 exclusive
1022 .start_waiters
1023 .drain(..)
1024 .for_each(|waker| waker.wake());
1025 }
1026 Ok(())
1027 }
1028
1029 fn handle_event_receive(
1030 &self,
1031 absolute_offset: u64,
1032 total_buffer_length: &mut u64,
1033 buffers: &[msquic::BufferRef],
1034 flags: msquic::ReceiveFlags,
1035 ) -> Result<(), msquic::Status> {
1036 trace!(
1037 "StreamInner({:p}, id={:?}) Receive {} offsets {} bytes, fin {}",
1038 self,
1039 self.shared.id.read(),
1040 absolute_offset,
1041 total_buffer_length,
1042 (flags & msquic::ReceiveFlags::FIN) == msquic::ReceiveFlags::FIN
1043 );
1044
1045 let recv_buffer = StreamRecvBuffer::new(
1046 absolute_offset as usize,
1047 buffers,
1048 (flags & msquic::ReceiveFlags::FIN) == msquic::ReceiveFlags::FIN,
1049 );
1050
1051 let mut exclusive = self.exclusive.lock().unwrap();
1052 exclusive.recv_len += *total_buffer_length as usize;
1053 exclusive.recv_buffers.push_back(recv_buffer);
1054 exclusive
1055 .read_waiters
1056 .drain(..)
1057 .for_each(|waker| waker.wake());
1058 Err(msquic::StatusCode::QUIC_STATUS_PENDING.into())
1059 }
1060
1061 fn handle_event_send_complete(
1062 &self,
1063 _canceled: bool,
1064 client_context: *const c_void,
1065 ) -> Result<(), msquic::Status> {
1066 trace!(
1067 "StreamInner({:p}, id={:?}) Send complete",
1068 self,
1069 self.shared.id.read()
1070 );
1071
1072 let mut write_buf = unsafe { WriteBuffer::from_raw(client_context) };
1073 let mut exclusive = self.exclusive.lock().unwrap();
1074 write_buf.reset();
1075 exclusive.write_pool.push(write_buf);
1076 Ok(())
1077 }
1078
1079 fn handle_event_peer_send_shutdown(&self) -> Result<(), msquic::Status> {
1080 trace!(
1081 "StreamInner({:p}, id={:?}) Peer send shutdown",
1082 self,
1083 self.shared.id.read()
1084 );
1085 let mut exclusive = self.exclusive.lock().unwrap();
1086 exclusive.recv_state = StreamRecvState::ShutdownComplete;
1087 exclusive
1088 .read_waiters
1089 .drain(..)
1090 .for_each(|waker| waker.wake());
1091 Ok(())
1092 }
1093
1094 fn handle_event_peer_send_aborted(&self, error_code: u64) -> Result<(), msquic::Status> {
1095 trace!(
1096 "StreamInner({:p}, id={:?}) Peer send aborted",
1097 self,
1098 self.shared.id.read()
1099 );
1100 let mut exclusive = self.exclusive.lock().unwrap();
1101 exclusive.recv_state = StreamRecvState::ShutdownComplete;
1102 exclusive.recv_error_code = Some(error_code);
1103 exclusive
1104 .read_waiters
1105 .drain(..)
1106 .for_each(|waker| waker.wake());
1107 Ok(())
1108 }
1109
1110 fn handle_event_peer_receive_aborted(&self, error_code: u64) -> Result<(), msquic::Status> {
1111 trace!(
1112 "StreamInner({:p}, id={:?}) Peer receive aborted",
1113 self,
1114 self.shared.id.read()
1115 );
1116 let mut exclusive = self.exclusive.lock().unwrap();
1117 exclusive.send_state = StreamSendState::ShutdownComplete;
1118 exclusive.send_error_code = Some(error_code);
1119 exclusive
1120 .write_shutdown_waiters
1121 .drain(..)
1122 .for_each(|waker| waker.wake());
1123 Ok(())
1124 }
1125
1126 fn handle_event_send_shutdown_complete(&self, _graceful: bool) -> Result<(), msquic::Status> {
1127 trace!(
1128 "StreamInner({:p}, id={:?}) Send shutdown complete",
1129 self,
1130 self.shared.id.read()
1131 );
1132 let mut exclusive = self.exclusive.lock().unwrap();
1133 exclusive.send_state = StreamSendState::ShutdownComplete;
1134 exclusive
1135 .write_shutdown_waiters
1136 .drain(..)
1137 .for_each(|waker| waker.wake());
1138 Ok(())
1139 }
1140
1141 #[allow(clippy::too_many_arguments)]
1142 fn handle_event_shutdown_complete(
1143 &self,
1144 msquic_stream: msquic::StreamRef,
1145 connection_shutdown: bool,
1146 app_close_in_progress: bool,
1147 connection_shutdown_by_app: bool,
1148 connection_closed_remotely: bool,
1149 connection_error_code: u64,
1150 connection_close_status: msquic::Status,
1151 ) -> Result<(), msquic::Status> {
1152 trace!(
1153 "StreamInner({:p}, id={:?}) Shutdown complete",
1154 self,
1155 self.shared.id.read()
1156 );
1157 {
1158 let mut exclusive = self.exclusive.lock().unwrap();
1159
1160 if !exclusive.recv_buffers.is_empty() {
1161 trace!(
1162 "StreamInner({:p}) read complete {}",
1163 self,
1164 exclusive.recv_len - exclusive.read_complete_cursor
1165 );
1166 exclusive.recv_buffers.clear();
1167 if !app_close_in_progress {
1168 msquic_stream.receive_complete(
1169 (exclusive.recv_len - exclusive.read_complete_cursor) as u64,
1170 );
1171 }
1172 }
1173
1174 exclusive.state = StreamState::ShutdownComplete;
1175 exclusive.recv_state = StreamRecvState::ShutdownComplete;
1176 exclusive.send_state = StreamSendState::ShutdownComplete;
1177 if connection_shutdown {
1178 match (connection_shutdown_by_app, connection_closed_remotely) {
1179 (true, true) => {
1180 exclusive.conn_error =
1181 Some(ConnectionError::ShutdownByPeer(connection_error_code));
1182 }
1183 (true, false) => {
1184 exclusive.conn_error = Some(ConnectionError::ShutdownByLocal);
1185 }
1186 (false, true) | (false, false) => {
1187 exclusive.conn_error = Some(ConnectionError::ShutdownByTransport(
1188 connection_close_status,
1189 connection_error_code,
1190 ));
1191 }
1192 }
1193 }
1194 exclusive
1195 .start_waiters
1196 .drain(..)
1197 .for_each(|waker| waker.wake());
1198 exclusive
1199 .read_waiters
1200 .drain(..)
1201 .for_each(|waker| waker.wake());
1202 }
1203 Ok(())
1207 }
1208
1209 fn handle_event_ideal_send_buffer_size(&self, _byte_count: u64) -> Result<(), msquic::Status> {
1210 trace!(
1211 "StreamInner({:p}, id={:?}) Ideal send buffer size",
1212 self,
1213 self.shared.id.read()
1214 );
1215 Ok(())
1216 }
1217
1218 fn handle_event_peer_accepted(&self) -> Result<(), msquic::Status> {
1219 trace!(
1220 "StreamInner({:p}, id={:?}) Peer accepted",
1221 self,
1222 self.shared.id.read()
1223 );
1224 let mut exclusive = self.exclusive.lock().unwrap();
1225 exclusive.state = StreamState::StartComplete;
1226 if self.shared.stream_type == StreamType::Bidirectional {
1227 exclusive.recv_state = StreamRecvState::StartComplete;
1228 }
1229 exclusive.send_state = StreamSendState::StartComplete;
1230 exclusive
1231 .start_waiters
1232 .drain(..)
1233 .for_each(|waker| waker.wake());
1234 Ok(())
1235 }
1236
1237 fn callback_handler_impl(
1238 &self,
1239 msquic_stream: msquic::StreamRef,
1240 ev: msquic::StreamEvent,
1241 ) -> Result<(), msquic::Status> {
1242 match ev {
1243 msquic::StreamEvent::StartComplete {
1244 status,
1245 id,
1246 peer_accepted,
1247 } => self.handle_event_start_complete(status, id, peer_accepted),
1248 msquic::StreamEvent::Receive {
1249 absolute_offset,
1250 total_buffer_length,
1251 buffers,
1252 flags,
1253 } => self.handle_event_receive(absolute_offset, total_buffer_length, buffers, flags),
1254 msquic::StreamEvent::SendComplete {
1255 cancelled,
1256 client_context,
1257 } => self.handle_event_send_complete(cancelled, client_context),
1258 msquic::StreamEvent::PeerSendShutdown => self.handle_event_peer_send_shutdown(),
1259 msquic::StreamEvent::PeerSendAborted { error_code } => {
1260 self.handle_event_peer_send_aborted(error_code)
1261 }
1262 msquic::StreamEvent::PeerReceiveAborted { error_code } => {
1263 self.handle_event_peer_receive_aborted(error_code)
1264 }
1265 msquic::StreamEvent::SendShutdownComplete { graceful } => {
1266 self.handle_event_send_shutdown_complete(graceful)
1267 }
1268 msquic::StreamEvent::ShutdownComplete {
1269 connection_shutdown,
1270 app_close_in_progress,
1271 connection_shutdown_by_app,
1272 connection_closed_remotely,
1273 connection_error_code,
1274 connection_close_status,
1275 } => self.handle_event_shutdown_complete(
1276 msquic_stream,
1277 connection_shutdown,
1278 app_close_in_progress,
1279 connection_shutdown_by_app,
1280 connection_closed_remotely,
1281 connection_error_code,
1282 connection_close_status,
1283 ),
1284 msquic::StreamEvent::IdealSendBufferSize { byte_count } => {
1285 self.handle_event_ideal_send_buffer_size(byte_count)
1286 }
1287 msquic::StreamEvent::PeerAccepted => self.handle_event_peer_accepted(),
1288 _ => {
1289 trace!("StreamInner({:p}) Other callback", self);
1290 Ok(())
1291 }
1292 }
1293 }
1294}
1295
1296impl Drop for StreamInner {
1297 fn drop(&mut self) {
1298 trace!("StreamInner({:p}) dropping", self);
1299 }
1300}
1301
1302impl fmt::Debug for StreamInnerExclusive {
1303 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1304 f.debug_struct("Exclusive")
1305 .field("state", &self.state)
1306 .field("recv_state", &self.recv_state)
1307 .field("send_state", &self.send_state)
1308 .finish()
1309 }
1310}
1311
1312impl fmt::Debug for StreamInnerShared {
1313 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1314 f.debug_struct("Shared")
1315 .field("type", &self.stream_type)
1316 .field("id", &self.id)
1317 .finish()
1318 }
1319}
1320pub struct ReadChunk<'a> {
1321 stream: &'a Arc<StreamInstance>,
1322}
1323
1324impl Future for ReadChunk<'_> {
1325 type Output = Result<Option<StreamRecvBuffer>, ReadError>;
1326
1327 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1328 self.stream.poll_read_chunk(cx)
1329 }
1330}
1331
1332pub struct WriteChunk<'a> {
1333 stream: &'a StreamInstance,
1334 chunk: &'a Bytes,
1335 fin: bool,
1336}
1337
1338impl Future for WriteChunk<'_> {
1339 type Output = Result<usize, WriteError>;
1340
1341 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1342 self.stream.poll_write_chunk(cx, self.chunk, self.fin)
1343 }
1344}
1345
1346pub struct WriteChunks<'a> {
1347 stream: &'a StreamInstance,
1348 chunks: &'a [Bytes],
1349 fin: bool,
1350}
1351
1352impl Future for WriteChunks<'_> {
1353 type Output = Result<usize, WriteError>;
1354
1355 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1356 self.stream.poll_write_chunks(cx, self.chunks, self.fin)
1357 }
1358}
1359
1360#[cfg(feature = "tokio")]
1361impl tokio::io::AsyncRead for Stream {
1362 fn poll_read(
1363 self: Pin<&mut Self>,
1364 cx: &mut Context<'_>,
1365 buf: &mut tokio::io::ReadBuf<'_>,
1366 ) -> Poll<std::io::Result<()>> {
1367 let len = ready!(Self::poll_read(self.get_mut(), cx, buf.initialized_mut()))?;
1368 buf.set_filled(len);
1369 Poll::Ready(Ok(()))
1370 }
1371}
1372
1373#[cfg(feature = "tokio")]
1374impl tokio::io::AsyncWrite for Stream {
1375 fn poll_write(
1376 self: Pin<&mut Self>,
1377 cx: &mut Context<'_>,
1378 buf: &[u8],
1379 ) -> Poll<std::io::Result<usize>> {
1380 let len = ready!(Self::poll_write(self.get_mut(), cx, buf, false))?;
1381 Poll::Ready(Ok(len))
1382 }
1383
1384 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<std::io::Result<()>> {
1385 Poll::Ready(Ok(()))
1386 }
1387
1388 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<std::io::Result<()>> {
1389 ready!(Self::poll_finish_write(self.get_mut(), cx))?;
1390 Poll::Ready(Ok(()))
1391 }
1392}
1393
1394#[cfg(feature = "tokio")]
1395impl tokio::io::AsyncRead for ReadStream {
1396 fn poll_read(
1397 self: Pin<&mut Self>,
1398 cx: &mut Context<'_>,
1399 buf: &mut tokio::io::ReadBuf<'_>,
1400 ) -> Poll<std::io::Result<()>> {
1401 let len = ready!(Self::poll_read(self.get_mut(), cx, buf.initialized_mut()))?;
1402 buf.set_filled(len);
1403 Poll::Ready(Ok(()))
1404 }
1405}
1406
1407#[cfg(feature = "tokio")]
1408impl tokio::io::AsyncWrite for WriteStream {
1409 fn poll_write(
1410 self: Pin<&mut Self>,
1411 cx: &mut Context<'_>,
1412 buf: &[u8],
1413 ) -> Poll<std::io::Result<usize>> {
1414 let len = ready!(Self::poll_write(self.get_mut(), cx, buf, false))?;
1415 Poll::Ready(Ok(len))
1416 }
1417
1418 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<std::io::Result<()>> {
1419 Poll::Ready(Ok(()))
1420 }
1421
1422 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<std::io::Result<()>> {
1423 ready!(Self::poll_finish_write(self.get_mut(), cx))?;
1424 Poll::Ready(Ok(()))
1425 }
1426}
1427
1428impl futures_io::AsyncRead for Stream {
1429 fn poll_read(
1430 self: Pin<&mut Self>,
1431 cx: &mut Context<'_>,
1432 buf: &mut [u8],
1433 ) -> Poll<std::io::Result<usize>> {
1434 let len = ready!(Self::poll_read(self.get_mut(), cx, buf))?;
1435 Poll::Ready(Ok(len))
1436 }
1437}
1438
1439impl futures_io::AsyncWrite for Stream {
1440 fn poll_write(
1441 self: Pin<&mut Self>,
1442 cx: &mut Context<'_>,
1443 buf: &[u8],
1444 ) -> Poll<std::io::Result<usize>> {
1445 let len = ready!(Self::poll_write(self.get_mut(), cx, buf, false))?;
1446 Poll::Ready(Ok(len))
1447 }
1448
1449 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
1450 Poll::Ready(Ok(()))
1451 }
1452
1453 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
1454 ready!(Self::poll_finish_write(self.get_mut(), cx))?;
1455 Poll::Ready(Ok(()))
1456 }
1457}
1458
1459impl futures_io::AsyncRead for ReadStream {
1460 fn poll_read(
1461 self: Pin<&mut Self>,
1462 cx: &mut Context<'_>,
1463 buf: &mut [u8],
1464 ) -> Poll<std::io::Result<usize>> {
1465 let len = ready!(Self::poll_read(self.get_mut(), cx, buf))?;
1466 Poll::Ready(Ok(len))
1467 }
1468}
1469
1470impl futures_io::AsyncWrite for WriteStream {
1471 fn poll_write(
1472 self: Pin<&mut Self>,
1473 cx: &mut Context<'_>,
1474 buf: &[u8],
1475 ) -> Poll<std::io::Result<usize>> {
1476 let len = ready!(Self::poll_write(self.get_mut(), cx, buf, false))?;
1477 Poll::Ready(Ok(len))
1478 }
1479
1480 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
1481 Poll::Ready(Ok(()))
1482 }
1483
1484 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
1485 ready!(Self::poll_finish_write(self.get_mut(), cx))?;
1486 Poll::Ready(Ok(()))
1487 }
1488}
1489
1490enum ReadStatus<T> {
1491 Readable(T),
1492 Finished(Option<T>),
1493 Blocked(Option<T>),
1494}
1495
1496impl<T> From<(Option<T>, bool)> for ReadStatus<T> {
1497 fn from(status: (Option<T>, bool)) -> Self {
1498 match status {
1499 (read, true) => Self::Finished(read),
1500 (read, false) => Self::Blocked(read),
1501 }
1502 }
1503}
1504
1505enum WriteStatus<T> {
1506 Writable(T),
1507 Finished(Option<T>),
1508 Blocked(Option<T>),
1509}
1510
1511impl<T> From<(Option<T>, bool)> for WriteStatus<T> {
1512 fn from(status: (Option<T>, bool)) -> Self {
1513 match status {
1514 (write, true) => Self::Finished(write),
1515 (write, false) => Self::Blocked(write),
1516 }
1517 }
1518}
1519
1520#[derive(Debug, Error, Clone)]
1521pub enum StartError {
1522 #[error("connection not started yet")]
1523 ConnectionNotStarted,
1524 #[error("reach stream count limit")]
1525 LimitReached,
1526 #[error("connection lost")]
1527 ConnectionLost(#[from] ConnectionError),
1528 #[error("other error: status {0:?}")]
1529 OtherError(msquic::Status),
1530}
1531
1532#[derive(Debug, Error, Clone)]
1533pub enum ReadError {
1534 #[error("stream not opened for reading")]
1535 Closed,
1536 #[error("stream reset by peer: error {0}")]
1537 Reset(u64),
1538 #[error("connection lost")]
1539 ConnectionLost(#[from] ConnectionError),
1540 #[error("other error: status {0:?}")]
1541 OtherError(msquic::Status),
1542}
1543
1544impl From<ReadError> for std::io::Error {
1545 fn from(e: ReadError) -> Self {
1546 let kind = match e {
1547 ReadError::Closed => std::io::ErrorKind::NotConnected,
1548 ReadError::Reset(_) => std::io::ErrorKind::ConnectionReset,
1549 ReadError::ConnectionLost(ConnectionError::ConnectionClosed) => {
1550 std::io::ErrorKind::NotConnected
1551 }
1552 ReadError::ConnectionLost(_) => std::io::ErrorKind::ConnectionAborted,
1553 ReadError::OtherError(_) => std::io::ErrorKind::Other,
1554 };
1555 Self::new(kind, e)
1556 }
1557}
1558
1559#[derive(Debug, Error, Clone)]
1560pub enum WriteError {
1561 #[error("stream not opened for writing")]
1562 Closed,
1563 #[error("stream finished")]
1564 Finished,
1565 #[error("stream stopped by peer: error {0}")]
1566 Stopped(u64),
1567 #[error("connection lost")]
1568 ConnectionLost(#[from] ConnectionError),
1569 #[error("other error: status {0:?}")]
1570 OtherError(msquic::Status),
1571}
1572
1573impl From<WriteError> for std::io::Error {
1574 fn from(e: WriteError) -> Self {
1575 let kind = match e {
1576 WriteError::Closed
1577 | WriteError::Finished
1578 | WriteError::ConnectionLost(ConnectionError::ConnectionClosed) => {
1579 std::io::ErrorKind::NotConnected
1580 }
1581 WriteError::Stopped(_) => std::io::ErrorKind::ConnectionReset,
1582 WriteError::ConnectionLost(_) => std::io::ErrorKind::ConnectionAborted,
1583 WriteError::OtherError(_) => std::io::ErrorKind::Other,
1584 };
1585 Self::new(kind, e)
1586 }
1587}