1use crate::buffer::{StreamRecvBuffer, WriteBuffer};
2use crate::connection::ConnectionError;
3
4use std::collections::VecDeque;
5use std::fmt;
6use std::future::Future;
7use std::ops::Deref;
8use std::pin::Pin;
9use std::sync::{Arc, Mutex, RwLock};
10use std::task::{ready, Context, Poll, Waker};
11
12use bytes::Bytes;
13use libc::c_void;
14use rangemap::RangeSet;
15use thiserror::Error;
16use tracing::trace;
17
18#[derive(Debug, Clone, Copy, PartialEq)]
19pub enum StreamType {
20 Bidirectional,
21 Unidirectional,
22}
23
24#[derive(Debug)]
26pub struct Stream(Arc<StreamInstance>);
27
28impl Stream {
29 pub(crate) fn open(
30 msquic_conn: &msquic::Connection,
31 stream_type: StreamType,
32 ) -> Result<Self, StartError> {
33 let msquic_stream = msquic::Stream::new();
34 let flags = if stream_type == StreamType::Unidirectional {
35 msquic::STREAM_OPEN_FLAG_UNIDIRECTIONAL
36 } else {
37 msquic::STREAM_OPEN_FLAG_NONE
38 };
39 let inner = Arc::new(StreamInner::new(
40 msquic_stream,
41 stream_type,
42 StreamSendState::Closed,
43 StreamRecvState::Closed,
44 true,
45 ));
46 inner
47 .shared
48 .msquic_stream
49 .open(
50 msquic_conn,
51 flags,
52 StreamInner::native_callback,
53 Arc::into_raw(inner.clone()) as *const c_void,
54 )
55 .map_err(StartError::OtherError)?;
56 trace!("Stream({:p}) Open by local", &*inner);
57
58 Ok(Self(Arc::new(StreamInstance(inner))))
59 }
60
61 pub(crate) fn from_handle(handle: msquic::Handle, stream_type: StreamType) -> Self {
62 let msquic_stream = msquic::Stream::from_parts(handle);
63 let send_state = if stream_type == StreamType::Bidirectional {
64 StreamSendState::StartComplete
65 } else {
66 StreamSendState::Closed
67 };
68 let inner = Arc::new(StreamInner::new(
69 msquic_stream,
70 stream_type,
71 send_state,
72 StreamRecvState::StartComplete,
73 false,
74 ));
75 inner.shared.msquic_stream.set_callback_handler(
76 StreamInner::native_callback,
77 Arc::into_raw(inner.clone()) as *const c_void,
78 );
79 let stream = Self(Arc::new(StreamInstance(inner)));
80 trace!(
81 "Stream({:p}, id={:?}) Start by peer",
82 &*stream.0 .0,
83 stream.id()
84 );
85 stream
86 }
87
88 pub(crate) fn poll_start(
89 &mut self,
90 cx: &mut Context,
91 failed_on_block: bool,
92 ) -> Poll<Result<(), StartError>> {
93 let mut exclusive = self.0.exclusive.lock().unwrap();
94 match exclusive.state {
95 StreamState::Open => {
96 self.0
97 .shared
98 .msquic_stream
99 .start(
100 msquic::STREAM_START_FLAG_SHUTDOWN_ON_FAIL
101 | msquic::STREAM_START_FLAG_INDICATE_PEER_ACCEPT
102 | if failed_on_block {
103 msquic::STREAM_START_FLAG_FAIL_BLOCKED
104 } else {
105 msquic::STREAM_START_FLAG_NONE
106 },
107 )
108 .map_err(StartError::OtherError)?;
109 exclusive.state = StreamState::Start;
110 if self.0.shared.stream_type == StreamType::Bidirectional {
111 exclusive.recv_state = StreamRecvState::Start;
112 }
113 exclusive.send_state = StreamSendState::Start;
114 }
115 StreamState::Start => {}
116 _ => {
117 if let Some(start_status) = exclusive.start_status {
118 if msquic::Status::succeeded(start_status) {
119 return Poll::Ready(Ok(()));
120 }
121 return Poll::Ready(Err(match start_status {
122 msquic::QUIC_STATUS_STREAM_LIMIT_REACHED => StartError::LimitReached,
123 msquic::QUIC_STATUS_ABORTED | msquic::QUIC_STATUS_INVALID_STATE => {
124 StartError::ConnectionLost(
125 exclusive.conn_error.as_ref().expect("conn_error").clone(),
126 )
127 }
128 _ => StartError::OtherError(start_status),
129 }));
130 } else {
131 return Poll::Ready(Ok(()));
132 }
133 }
134 }
135 exclusive.start_waiters.push(cx.waker().clone());
136 Poll::Pending
137 }
138
139 pub fn id(&self) -> Option<u64> {
141 self.0.id()
142 }
143
144 pub fn split(self) -> (Option<ReadStream>, Option<WriteStream>) {
146 match (self.0.shared.stream_type, self.0.shared.local_open) {
147 (StreamType::Unidirectional, true) => (None, Some(WriteStream(self.0))),
148 (StreamType::Unidirectional, false) => (Some(ReadStream(self.0)), None),
149 (StreamType::Bidirectional, _) => {
150 (Some(ReadStream(self.0.clone())), Some(WriteStream(self.0)))
151 }
152 }
153 }
154
155 pub fn poll_read(
157 &mut self,
158 cx: &mut Context<'_>,
159 buf: &mut [u8],
160 ) -> Poll<Result<usize, ReadError>> {
161 self.0.poll_read(cx, buf)
162 }
163
164 pub fn poll_read_chunk(
166 &self,
167 cx: &mut Context<'_>,
168 ) -> Poll<Result<Option<StreamRecvBuffer>, ReadError>> {
169 self.0.poll_read_chunk(cx)
170 }
171
172 pub fn read_chunk(&self) -> ReadChunk<'_> {
174 self.0.read_chunk()
175 }
176
177 pub fn poll_write(
179 &mut self,
180 cx: &mut Context<'_>,
181 buf: &[u8],
182 fin: bool,
183 ) -> Poll<Result<usize, WriteError>> {
184 self.0.poll_write(cx, buf, fin)
185 }
186
187 pub fn poll_write_chunk(
189 &mut self,
190 cx: &mut Context<'_>,
191 chunk: &Bytes,
192 fin: bool,
193 ) -> Poll<Result<usize, WriteError>> {
194 self.0.poll_write_chunk(cx, chunk, fin)
195 }
196
197 pub fn write_chunk<'a>(&'a mut self, chunk: &'a Bytes, fin: bool) -> WriteChunk<'a> {
199 self.0.write_chunk(chunk, fin)
200 }
201
202 pub fn poll_write_chunks(
204 &mut self,
205 cx: &mut Context<'_>,
206 chunks: &[Bytes],
207 fin: bool,
208 ) -> Poll<Result<usize, WriteError>> {
209 self.0.poll_write_chunks(cx, chunks, fin)
210 }
211
212 pub fn write_chunks<'a>(&'a mut self, chunks: &'a [Bytes], fin: bool) -> WriteChunks<'a> {
214 self.0.write_chunks(chunks, fin)
215 }
216
217 pub fn poll_finish_write(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), WriteError>> {
219 self.0.poll_finish_write(cx)
220 }
221
222 pub fn poll_abort_write(
224 &mut self,
225 cx: &mut Context<'_>,
226 error_code: u64,
227 ) -> Poll<Result<(), WriteError>> {
228 self.0.poll_abort_write(cx, error_code)
229 }
230
231 pub fn abort_write(&mut self, error_code: u64) -> Result<(), WriteError> {
233 self.0.abort_write(error_code)
234 }
235
236 pub fn poll_abort_read(
238 &mut self,
239 cx: &mut Context<'_>,
240 error_code: u64,
241 ) -> Poll<Result<(), ReadError>> {
242 self.0.poll_abort_read(cx, error_code)
243 }
244
245 pub fn abort_read(&mut self, error_code: u64) -> Result<(), ReadError> {
247 self.0.abort_read(error_code)
248 }
249}
250
251#[derive(Debug)]
253pub struct ReadStream(Arc<StreamInstance>);
254
255impl ReadStream {
256 pub fn id(&self) -> Option<u64> {
258 self.0.id()
259 }
260
261 pub fn poll_read(
263 &mut self,
264 cx: &mut Context<'_>,
265 buf: &mut [u8],
266 ) -> Poll<Result<usize, ReadError>> {
267 self.0.poll_read(cx, buf)
268 }
269
270 pub fn poll_read_chunk(
272 &self,
273 cx: &mut Context<'_>,
274 ) -> Poll<Result<Option<StreamRecvBuffer>, ReadError>> {
275 self.0.poll_read_chunk(cx)
276 }
277
278 pub fn read_chunk(&self) -> ReadChunk<'_> {
280 self.0.read_chunk()
281 }
282
283 pub fn poll_abort_read(
285 &mut self,
286 cx: &mut Context<'_>,
287 error_code: u64,
288 ) -> Poll<Result<(), ReadError>> {
289 self.0.poll_abort_read(cx, error_code)
290 }
291
292 pub fn abort_read(&mut self, error_code: u64) -> Result<(), ReadError> {
294 self.0.abort_read(error_code)
295 }
296}
297
298#[derive(Debug)]
300pub struct WriteStream(Arc<StreamInstance>);
301
302impl WriteStream {
303 pub fn id(&self) -> Option<u64> {
305 self.0.id()
306 }
307
308 pub fn poll_write(
310 &mut self,
311 cx: &mut Context<'_>,
312 buf: &[u8],
313 fin: bool,
314 ) -> Poll<Result<usize, WriteError>> {
315 self.0.poll_write(cx, buf, fin)
316 }
317
318 pub fn poll_write_chunk(
320 &mut self,
321 cx: &mut Context<'_>,
322 chunk: &Bytes,
323 fin: bool,
324 ) -> Poll<Result<usize, WriteError>> {
325 self.0.poll_write_chunk(cx, chunk, fin)
326 }
327
328 pub fn write_chunk<'a>(&'a mut self, chunk: &'a Bytes, fin: bool) -> WriteChunk<'a> {
330 self.0.write_chunk(chunk, fin)
331 }
332
333 pub fn poll_write_chunks(
335 &mut self,
336 cx: &mut Context<'_>,
337 chunks: &[Bytes],
338 fin: bool,
339 ) -> Poll<Result<usize, WriteError>> {
340 self.0.poll_write_chunks(cx, chunks, fin)
341 }
342
343 pub fn write_chunks<'a>(&'a mut self, chunks: &'a [Bytes], fin: bool) -> WriteChunks<'a> {
345 self.0.write_chunks(chunks, fin)
346 }
347
348 pub fn poll_finish_write(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), WriteError>> {
350 self.0.poll_finish_write(cx)
351 }
352
353 pub fn poll_abort_write(
355 &mut self,
356 cx: &mut Context<'_>,
357 error_code: u64,
358 ) -> Poll<Result<(), WriteError>> {
359 self.0.poll_abort_write(cx, error_code)
360 }
361
362 pub fn abort_write(&mut self, error_code: u64) -> Result<(), WriteError> {
364 self.0.abort_write(error_code)
365 }
366}
367
368impl StreamInstance {
369 pub(crate) fn id(&self) -> Option<u64> {
370 let id = { *self.0.shared.id.read().unwrap() };
371 if id.is_some() {
372 id
373 } else {
374 let id = 0u64;
375 let mut buffer_length = std::mem::size_of_val(&id) as u32;
376 let res = self.0.shared.msquic_stream.get_param(
377 msquic::PARAM_STREAM_ID,
378 &mut buffer_length as *mut _,
379 &id as *const _ as *const c_void,
380 );
381 if res.is_ok() {
382 self.0.shared.id.write().unwrap().replace(id);
383 Some(id)
384 } else {
385 None
386 }
387 }
388 }
389
390 pub(crate) fn poll_read(
391 &self,
392 cx: &mut Context<'_>,
393 buf: &mut [u8],
394 ) -> Poll<Result<usize, ReadError>> {
395 self.poll_read_generic(cx, |recv_buffers, read_complete_buffers| {
396 let mut read = 0;
397 let mut fin = false;
398 loop {
399 if read == buf.len() {
400 return ReadStatus::Readable(read);
401 }
402
403 match recv_buffers
404 .front_mut()
405 .and_then(|x| x.get_bytes_upto_size(buf.len() - read))
406 {
407 Some(slice) => {
408 let len = slice.len();
409 buf[read..read + len].copy_from_slice(slice);
410 read += len;
411 }
412 None => {
413 if let Some(mut recv_buffer) = recv_buffers.pop_front() {
414 recv_buffer.set_stream(self.0.clone());
415 fin = recv_buffer.fin();
416 read_complete_buffers.push(recv_buffer);
417 continue;
418 } else {
419 return (if read > 0 { Some(read) } else { None }, fin).into();
420 }
421 }
422 }
423 }
424 })
425 .map(|res| res.map(|x| x.unwrap_or(0)))
426 }
427
428 fn poll_read_chunk(
429 &self,
430 cx: &mut Context<'_>,
431 ) -> Poll<Result<Option<StreamRecvBuffer>, ReadError>> {
432 self.poll_read_generic(cx, |recv_buffers, _| {
433 recv_buffers
434 .pop_front()
435 .map(|mut recv_buffer| {
436 let fin = recv_buffer.fin();
437 recv_buffer.set_stream(self.0.clone());
438 (Some(recv_buffer), fin)
439 })
440 .unwrap_or((None, false))
441 .into()
442 })
443 }
444
445 fn read_chunk(&self) -> ReadChunk<'_> {
446 ReadChunk { stream: self }
447 }
448
449 fn poll_read_generic<T, U>(
450 &self,
451 cx: &mut Context<'_>,
452 mut read_fn: T,
453 ) -> Poll<Result<Option<U>, ReadError>>
454 where
455 T: FnMut(&mut VecDeque<StreamRecvBuffer>, &mut Vec<StreamRecvBuffer>) -> ReadStatus<U>,
456 {
457 let res;
458 let mut read_complete_buffers = Vec::new();
459 {
460 let mut exclusive = self.0.exclusive.lock().unwrap();
461 match exclusive.recv_state {
462 StreamRecvState::Closed => {
463 return Poll::Ready(Err(ReadError::Closed));
464 }
465 StreamRecvState::Start => {
466 exclusive.start_waiters.push(cx.waker().clone());
467 return Poll::Pending;
468 }
469 StreamRecvState::StartComplete => {}
470 StreamRecvState::Shutdown => {
471 return Poll::Ready(Ok(None));
472 }
473 StreamRecvState::ShutdownComplete => {
474 if let Some(conn_error) = &exclusive.conn_error {
475 return Poll::Ready(Err(ReadError::ConnectionLost(conn_error.clone())));
476 } else if let Some(error_code) = &exclusive.recv_error_code {
477 return Poll::Ready(Err(ReadError::Reset(*error_code)));
478 } else {
479 return Poll::Ready(Ok(None));
480 }
481 }
482 }
483
484 let status = read_fn(&mut exclusive.recv_buffers, &mut read_complete_buffers);
485
486 res = match status {
487 ReadStatus::Readable(read) | ReadStatus::Blocked(Some(read)) => {
488 Poll::Ready(Ok(Some(read)))
489 }
490 ReadStatus::Finished(read) => {
491 exclusive.recv_state = StreamRecvState::Shutdown;
492 Poll::Ready(Ok(read))
493 }
494 ReadStatus::Blocked(None) => {
495 exclusive.read_waiters.push(cx.waker().clone());
496 Poll::Pending
497 }
498 };
499 }
500 res
501 }
502
503 pub(crate) fn poll_write(
504 &self,
505 cx: &mut Context<'_>,
506 buf: &[u8],
507 fin: bool,
508 ) -> Poll<Result<usize, WriteError>> {
509 self.poll_write_generic(cx, |write_buf| {
510 let written = write_buf.put_slice(buf);
511 if written == buf.len() && !fin {
512 WriteStatus::Writable(written)
513 } else {
514 (Some(written), fin).into()
515 }
516 })
517 .map(|res| res.map(|x| x.unwrap_or(0)))
518 }
519
520 pub(crate) fn poll_write_chunk(
521 &self,
522 cx: &mut Context<'_>,
523 chunk: &Bytes,
524 fin: bool,
525 ) -> Poll<Result<usize, WriteError>> {
526 self.poll_write_generic(cx, |write_buf| {
527 let written = write_buf.put_zerocopy(chunk);
528 if written == chunk.len() && !fin {
529 WriteStatus::Writable(written)
530 } else {
531 (Some(written), fin).into()
532 }
533 })
534 .map(|res| res.map(|x| x.unwrap_or(0)))
535 }
536
537 pub(crate) fn write_chunk<'a>(&'a self, chunk: &'a Bytes, fin: bool) -> WriteChunk<'a> {
538 WriteChunk {
539 stream: self,
540 chunk,
541 fin,
542 }
543 }
544
545 fn poll_write_chunks(
546 &self,
547 cx: &mut Context<'_>,
548 chunks: &[Bytes],
549 fin: bool,
550 ) -> Poll<Result<usize, WriteError>> {
551 self.poll_write_generic(cx, |write_buf| {
552 let (mut total_len, mut total_written) = (0, 0);
553 for buf in chunks {
554 total_len += buf.len();
555 total_written += write_buf.put_zerocopy(buf);
556 }
557 if total_written == total_len && !fin {
558 WriteStatus::Writable(total_written)
559 } else {
560 (Some(total_written), fin).into()
561 }
562 })
563 .map(|res| res.map(|x| x.unwrap_or(0)))
564 }
565
566 pub(crate) fn write_chunks<'a>(&'a self, chunks: &'a [Bytes], fin: bool) -> WriteChunks<'a> {
567 WriteChunks {
568 stream: self,
569 chunks,
570 fin,
571 }
572 }
573
574 fn poll_write_generic<T, U>(
575 &self,
576 _cx: &mut Context<'_>,
577 mut write_fn: T,
578 ) -> Poll<Result<Option<U>, WriteError>>
579 where
580 T: FnMut(&mut WriteBuffer) -> WriteStatus<U>,
581 {
582 let mut exclusive = self.0.exclusive.lock().unwrap();
583 match exclusive.send_state {
584 StreamSendState::Closed => {
585 return Poll::Ready(Err(WriteError::Closed));
586 }
587 StreamSendState::Start => {
588 exclusive.start_waiters.push(_cx.waker().clone());
589 return Poll::Pending;
590 }
591 StreamSendState::StartComplete => {}
592 StreamSendState::Shutdown => {
593 return Poll::Ready(Err(WriteError::Finished));
594 }
595 StreamSendState::ShutdownComplete => {
596 if let Some(conn_error) = &exclusive.conn_error {
597 return Poll::Ready(Err(WriteError::ConnectionLost(conn_error.clone())));
598 } else if let Some(error_code) = &exclusive.send_error_code {
599 return Poll::Ready(Err(WriteError::Stopped(*error_code)));
600 } else {
601 return Poll::Ready(Err(WriteError::Finished));
602 }
603 }
604 }
605 let mut write_buf = exclusive.write_pool.pop().unwrap_or(WriteBuffer::new());
606 let status = write_fn(&mut write_buf);
607 let (buffer, buffer_count) = write_buf.get_buffer();
608 match status {
609 WriteStatus::Writable(val) | WriteStatus::Blocked(Some(val)) => {
610 match self
611 .0
612 .shared
613 .msquic_stream
614 .send(
615 unsafe { &*buffer },
616 buffer_count,
617 msquic::SEND_FLAG_NONE,
618 write_buf.into_raw() as *const _,
619 )
620 .map_err(WriteError::OtherError)
621 {
622 Ok(()) => Poll::Ready(Ok(Some(val))),
623 Err(e) => Poll::Ready(Err(e)),
624 }
625 }
626 WriteStatus::Blocked(None) => unreachable!(),
627 WriteStatus::Finished(val) => {
628 match self
629 .0
630 .shared
631 .msquic_stream
632 .send(
633 unsafe { &*buffer },
634 buffer_count,
635 msquic::SEND_FLAG_FIN,
636 write_buf.into_raw() as *const _,
637 )
638 .map_err(WriteError::OtherError)
639 {
640 Ok(()) => {
641 exclusive.send_state = StreamSendState::Shutdown;
642 Poll::Ready(Ok(val))
643 }
644 Err(e) => Poll::Ready(Err(e)),
645 }
646 }
647 }
648 }
649
650 pub(crate) fn poll_finish_write(&self, cx: &mut Context<'_>) -> Poll<Result<(), WriteError>> {
651 let mut exclusive = self.0.exclusive.lock().unwrap();
652 match exclusive.send_state {
653 StreamSendState::Start => {
654 exclusive.start_waiters.push(cx.waker().clone());
655 return Poll::Pending;
656 }
657 StreamSendState::StartComplete => {
658 match self
659 .0
660 .shared
661 .msquic_stream
662 .shutdown(msquic::STREAM_SHUTDOWN_FLAG_GRACEFUL, 0)
663 .map_err(WriteError::OtherError)
664 {
665 Ok(()) => {
666 exclusive.send_state = StreamSendState::Shutdown;
667 }
668 Err(e) => return Poll::Ready(Err(e)),
669 }
670 }
671 StreamSendState::Shutdown => {}
672 StreamSendState::ShutdownComplete => {
673 if let Some(conn_error) = &exclusive.conn_error {
674 return Poll::Ready(Err(WriteError::ConnectionLost(conn_error.clone())));
675 } else if let Some(error_code) = &exclusive.send_error_code {
676 return Poll::Ready(Err(WriteError::Stopped(*error_code)));
677 } else {
678 return Poll::Ready(Ok(()));
679 }
680 }
681 _ => {
682 return Poll::Ready(Err(WriteError::Closed));
683 }
684 }
685 exclusive.write_shutdown_waiters.push(cx.waker().clone());
686 Poll::Pending
687 }
688
689 pub(crate) fn poll_abort_write(
690 &self,
691 cx: &mut Context<'_>,
692 error_code: u64,
693 ) -> Poll<Result<(), WriteError>> {
694 let mut exclusive = self.0.exclusive.lock().unwrap();
695 match exclusive.send_state {
696 StreamSendState::Start => {
697 exclusive.start_waiters.push(cx.waker().clone());
698 return Poll::Pending;
699 }
700 StreamSendState::StartComplete => {
701 match self
702 .0
703 .shared
704 .msquic_stream
705 .shutdown(msquic::STREAM_SHUTDOWN_FLAG_ABORT_SEND, error_code)
706 .map_err(WriteError::OtherError)
707 {
708 Ok(()) => {
709 exclusive.send_state = StreamSendState::Shutdown;
710 }
711 Err(e) => return Poll::Ready(Err(e)),
712 }
713 }
714 StreamSendState::Shutdown => {}
715 StreamSendState::ShutdownComplete => {
716 if let Some(conn_error) = &exclusive.conn_error {
717 return Poll::Ready(Err(WriteError::ConnectionLost(conn_error.clone())));
718 } else if let Some(error_code) = &exclusive.send_error_code {
719 return Poll::Ready(Err(WriteError::Stopped(*error_code)));
720 } else {
721 return Poll::Ready(Ok(()));
722 }
723 }
724 _ => {
725 return Poll::Ready(Err(WriteError::Closed));
726 }
727 }
728 exclusive.write_shutdown_waiters.push(cx.waker().clone());
729 Poll::Pending
730 }
731
732 pub(crate) fn abort_write(&self, error_code: u64) -> Result<(), WriteError> {
733 let mut exclusive = self.0.exclusive.lock().unwrap();
734 match exclusive.send_state {
735 StreamSendState::StartComplete => {
736 self.0
737 .shared
738 .msquic_stream
739 .shutdown(msquic::STREAM_SHUTDOWN_FLAG_ABORT_SEND, error_code)
740 .map_err(WriteError::OtherError)?;
741 exclusive.send_state = StreamSendState::Shutdown;
742 Ok(())
743 }
744 _ => Err(WriteError::Closed),
745 }
746 }
747
748 pub(crate) fn poll_abort_read(
749 &self,
750 cx: &mut Context<'_>,
751 error_code: u64,
752 ) -> Poll<Result<(), ReadError>> {
753 let mut exclusive = self.0.exclusive.lock().unwrap();
754 match exclusive.recv_state {
755 StreamRecvState::Start => {
756 exclusive.start_waiters.push(cx.waker().clone());
757 Poll::Pending
758 }
759 StreamRecvState::StartComplete => {
760 match self
761 .0
762 .shared
763 .msquic_stream
764 .shutdown(msquic::STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE, error_code)
765 .map_err(ReadError::OtherError)
766 {
767 Ok(()) => {
768 exclusive.recv_state = StreamRecvState::ShutdownComplete;
769 exclusive
770 .read_waiters
771 .drain(..)
772 .for_each(|waker| waker.wake());
773 Poll::Ready(Ok(()))
774 }
775 Err(e) => Poll::Ready(Err(e)),
776 }
777 }
778 StreamRecvState::ShutdownComplete => {
779 if let Some(conn_error) = &exclusive.conn_error {
780 Poll::Ready(Err(ReadError::ConnectionLost(conn_error.clone())))
781 } else if let Some(error_code) = &exclusive.recv_error_code {
782 Poll::Ready(Err(ReadError::Reset(*error_code)))
783 } else {
784 Poll::Ready(Ok(()))
785 }
786 }
787 _ => Poll::Ready(Err(ReadError::Closed)),
788 }
789 }
790
791 pub(crate) fn abort_read(&self, error_code: u64) -> Result<(), ReadError> {
792 let mut exclusive = self.0.exclusive.lock().unwrap();
793 match exclusive.recv_state {
794 StreamRecvState::StartComplete => {
795 self.0
796 .shared
797 .msquic_stream
798 .shutdown(msquic::STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE, error_code)
799 .map_err(ReadError::OtherError)?;
800 exclusive.recv_state = StreamRecvState::ShutdownComplete;
801 }
802 _ => {
803 return Err(ReadError::Closed);
804 }
805 }
806 Ok(())
807 }
808}
809#[derive(Clone, Debug)]
810struct StreamInstance(Arc<StreamInner>);
811
812impl Drop for StreamInstance {
813 fn drop(&mut self) {
814 trace!("StreamInstance({:p}) dropping", &*self.0);
815 let mut exclusive = self.0.exclusive.lock().unwrap();
816 exclusive.recv_buffers.clear();
817 match exclusive.state {
818 StreamState::Start | StreamState::StartComplete => {
819 trace!("StreamInstance({:p}) shutdown while dropping", &*self.0);
820 let _ = self.0.shared.msquic_stream.shutdown(
821 msquic::STREAM_SHUTDOWN_FLAG_ABORT_SEND
822 | msquic::STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE
823 | msquic::STREAM_SHUTDOWN_FLAG_IMMEDIATE,
824 0,
825 );
826 }
827 _ => {}
828 }
829 }
830}
831
832impl Deref for StreamInstance {
833 type Target = StreamInner;
834
835 fn deref(&self) -> &Self::Target {
836 &self.0
837 }
838}
839
840#[derive(Debug)]
841pub(crate) struct StreamInner {
842 exclusive: Mutex<StreamInnerExclusive>,
843 pub(crate) shared: StreamInnerShared,
844}
845
846struct StreamInnerExclusive {
847 state: StreamState,
848 start_status: Option<u32>,
849 recv_state: StreamRecvState,
850 recv_buffers: VecDeque<StreamRecvBuffer>,
851 read_complete_map: RangeSet<usize>,
852 read_complete_cursor: usize,
853 send_state: StreamSendState,
854 write_pool: Vec<WriteBuffer>,
855 recv_error_code: Option<u64>,
856 send_error_code: Option<u64>,
857 conn_error: Option<ConnectionError>,
858 start_waiters: Vec<Waker>,
859 read_waiters: Vec<Waker>,
860 write_shutdown_waiters: Vec<Waker>,
861}
862
863pub(crate) struct StreamInnerShared {
864 stream_type: StreamType,
865 local_open: bool,
866 id: RwLock<Option<u64>>,
867 pub(crate) msquic_stream: msquic::Stream,
868}
869
870#[derive(Debug, PartialEq)]
871enum StreamState {
872 Open,
873 Start,
874 StartComplete,
875 ShutdownComplete,
876}
877
878#[derive(Debug, PartialEq)]
879enum StreamRecvState {
880 Closed,
881 Start,
882 StartComplete,
883 Shutdown,
884 ShutdownComplete,
885}
886
887#[derive(Debug, PartialEq)]
888enum StreamSendState {
889 Closed,
890 Start,
891 StartComplete,
892 Shutdown,
893 ShutdownComplete,
894}
895
896impl StreamInner {
897 fn new(
898 msquic_stream: msquic::Stream,
899 stream_type: StreamType,
900 send_state: StreamSendState,
901 recv_state: StreamRecvState,
902 local_open: bool,
903 ) -> Self {
904 Self {
905 exclusive: Mutex::new(StreamInnerExclusive {
906 state: StreamState::Open,
907 start_status: None,
908 recv_state,
909 recv_buffers: VecDeque::new(),
910 read_complete_map: RangeSet::new(),
911 read_complete_cursor: 0,
912 send_state,
913 write_pool: Vec::new(),
914 recv_error_code: None,
915 send_error_code: None,
916 conn_error: None,
917 start_waiters: Vec::new(),
918 read_waiters: Vec::new(),
919 write_shutdown_waiters: Vec::new(),
920 }),
921 shared: StreamInnerShared {
922 msquic_stream,
923 local_open,
924 id: RwLock::new(None),
925 stream_type,
926 },
927 }
928 }
929
930 pub(crate) fn read_complete(&self, buffer: &StreamRecvBuffer) {
931 let buffer_range = buffer.range();
932 trace!(
933 "StreamInner({:p}) read complete offset={} len={}",
934 self,
935 buffer_range.start,
936 buffer_range.end - buffer_range.start
937 );
938
939 let complete_len = if !buffer_range.is_empty() {
940 let mut exclusive = self.exclusive.lock().unwrap();
941 exclusive.read_complete_map.insert(buffer_range);
942 let complete_range = exclusive.read_complete_map.first().unwrap();
943 trace!(
944 "StreamInner({:p}) complete read offset={} len={}",
945 self,
946 complete_range.start,
947 complete_range.end - complete_range.start
948 );
949 if complete_range.start == 0 && exclusive.read_complete_cursor < complete_range.end {
950 let complete_len = complete_range.end - exclusive.read_complete_cursor;
951 exclusive.read_complete_cursor = complete_range.end;
952 complete_len
953 } else {
954 0
955 }
956 } else {
957 0
958 };
959 if complete_len > 0 {
960 trace!(
961 "StreamInner({:p}) call receive_complete len={}",
962 self,
963 complete_len
964 );
965 let _ = self
966 .shared
967 .msquic_stream
968 .receive_complete(complete_len as u64);
969 }
970 }
971
972 fn handle_event_start_complete(
973 _stream: msquic::Handle,
974 inner: &Self,
975 payload: &msquic::StreamEventStartComplete,
976 ) -> u32 {
977 if msquic::Status::succeeded(payload.status) {
978 inner.shared.id.write().unwrap().replace(payload.id);
979 }
980 trace!(
981 "Stream({:p}, id={:?}) start complete status=0x{:x}, peer_accepted={}, id={}",
982 inner,
983 inner.shared.id.read(),
984 payload.status,
985 payload.bit_flags.peer_accepted(),
986 payload.id,
987 );
988 let mut exclusive = inner.exclusive.lock().unwrap();
989 exclusive.start_status = Some(payload.status);
990 if msquic::Status::succeeded(payload.status) && payload.bit_flags.peer_accepted() == 1 {
991 exclusive.state = StreamState::StartComplete;
992 if inner.shared.stream_type == StreamType::Bidirectional {
993 exclusive.recv_state = StreamRecvState::StartComplete;
994 }
995 exclusive.send_state = StreamSendState::StartComplete;
996 }
997
998 if payload.status == msquic::QUIC_STATUS_STREAM_LIMIT_REACHED
999 || payload.bit_flags.peer_accepted() == 1
1000 {
1001 exclusive
1002 .start_waiters
1003 .drain(..)
1004 .for_each(|waker| waker.wake());
1005 }
1006 msquic::QUIC_STATUS_SUCCESS
1007 }
1008
1009 fn handle_event_receive(
1010 _stream: msquic::Handle,
1011 inner: &Self,
1012 payload: &msquic::StreamEventReceive,
1013 ) -> u32 {
1014 trace!(
1015 "Stream({:p}, id={:?}) Receive {} offsets {} bytes, fin {}",
1016 inner,
1017 inner.shared.id.read(),
1018 payload.absolute_offset,
1019 payload.total_buffer_length,
1020 (payload.flags & msquic::RECEIVE_FLAG_FIN) == msquic::RECEIVE_FLAG_FIN
1021 );
1022
1023 let buffers =
1024 unsafe { std::slice::from_raw_parts(payload.buffer, payload.buffer_count as usize) };
1025
1026 let arc_inner: Arc<Self> = unsafe { Arc::from_raw(inner as *const _) };
1027
1028 let recv_buffer = StreamRecvBuffer::new(
1029 payload.absolute_offset as usize,
1030 &buffers,
1031 (payload.flags & msquic::RECEIVE_FLAG_FIN) == msquic::RECEIVE_FLAG_FIN,
1032 );
1033
1034 let _ = Arc::into_raw(arc_inner);
1035
1036 let mut exclusive = inner.exclusive.lock().unwrap();
1037 exclusive.recv_buffers.push_back(recv_buffer);
1038 exclusive
1039 .read_waiters
1040 .drain(..)
1041 .for_each(|waker| waker.wake());
1042 msquic::QUIC_STATUS_PENDING
1043 }
1044
1045 fn handle_event_send_complete(
1046 _stream: msquic::Handle,
1047 inner: &Self,
1048 payload: &msquic::StreamEventSendComplete,
1049 ) -> u32 {
1050 trace!(
1051 "Stream({:p}, id={:?}) Send complete",
1052 inner,
1053 inner.shared.id.read()
1054 );
1055
1056 let mut write_buf = unsafe { WriteBuffer::from_raw(payload.client_context) };
1057 let mut exclusive = inner.exclusive.lock().unwrap();
1058 write_buf.reset();
1059 exclusive.write_pool.push(write_buf);
1060 msquic::QUIC_STATUS_SUCCESS
1061 }
1062
1063 fn handle_event_peer_send_shutdown(_stream: msquic::Handle, inner: &Self) -> u32 {
1064 trace!(
1065 "Stream({:p}, id={:?}) Peer send shutdown",
1066 inner,
1067 inner.shared.id.read()
1068 );
1069 let mut exclusive = inner.exclusive.lock().unwrap();
1070 exclusive.recv_state = StreamRecvState::ShutdownComplete;
1071 exclusive
1072 .read_waiters
1073 .drain(..)
1074 .for_each(|waker| waker.wake());
1075 msquic::QUIC_STATUS_SUCCESS
1076 }
1077
1078 fn handle_event_peer_send_aborted(
1079 _stream: msquic::Handle,
1080 inner: &Self,
1081 payload: &msquic::StreamEventPeerSendAborted,
1082 ) -> u32 {
1083 trace!(
1084 "Stream({:p}, id={:?}) Peer send aborted",
1085 inner,
1086 inner.shared.id.read()
1087 );
1088 let mut exclusive = inner.exclusive.lock().unwrap();
1089 exclusive.recv_state = StreamRecvState::ShutdownComplete;
1090 exclusive.recv_error_code = Some(payload.error_code);
1091 exclusive
1092 .read_waiters
1093 .drain(..)
1094 .for_each(|waker| waker.wake());
1095 msquic::QUIC_STATUS_SUCCESS
1096 }
1097
1098 fn handle_event_peer_receive_aborted(
1099 _stream: msquic::Handle,
1100 inner: &Self,
1101 payload: &msquic::StreamEventPeerReceiveAborted,
1102 ) -> u32 {
1103 trace!(
1104 "Stream({:p}, id={:?}) Peer receive aborted",
1105 inner,
1106 inner.shared.id.read()
1107 );
1108 let mut exclusive = inner.exclusive.lock().unwrap();
1109 exclusive.send_state = StreamSendState::ShutdownComplete;
1110 exclusive.send_error_code = Some(payload.error_code);
1111 exclusive
1112 .write_shutdown_waiters
1113 .drain(..)
1114 .for_each(|waker| waker.wake());
1115 msquic::QUIC_STATUS_SUCCESS
1116 }
1117
1118 fn handle_event_send_shutdown_complete(
1119 _stream: msquic::Handle,
1120 inner: &Self,
1121 _payload: &msquic::StreamEventSendShutdownComplete,
1122 ) -> u32 {
1123 trace!(
1124 "Stream({:p}, id={:?}) Send shutdown complete",
1125 inner,
1126 inner.shared.id.read()
1127 );
1128 let mut exclusive = inner.exclusive.lock().unwrap();
1129 exclusive.send_state = StreamSendState::ShutdownComplete;
1130 exclusive
1131 .write_shutdown_waiters
1132 .drain(..)
1133 .for_each(|waker| waker.wake());
1134 msquic::QUIC_STATUS_SUCCESS
1135 }
1136
1137 fn handle_event_shutdown_complete(
1138 _stream: msquic::Handle,
1139 inner: &Self,
1140 payload: &msquic::StreamEventShutdownComplete,
1141 ) -> u32 {
1142 trace!(
1143 "Stream({:p}, id={:?}) Shutdown complete",
1144 inner,
1145 inner.shared.id.read()
1146 );
1147 {
1148 let mut exclusive = inner.exclusive.lock().unwrap();
1149 exclusive.state = StreamState::ShutdownComplete;
1150 exclusive.recv_state = StreamRecvState::ShutdownComplete;
1151 exclusive.send_state = StreamSendState::ShutdownComplete;
1152 if payload.connection_shutdown {
1153 match (
1154 payload.bit_flags.conn_shutdown_by_app() == 1,
1155 payload.bit_flags.conn_closed_remotely() == 1,
1156 ) {
1157 (true, true) => {
1158 exclusive.conn_error = Some(ConnectionError::ShutdownByPeer(
1159 payload.connection_error_code,
1160 ));
1161 }
1162 (true, false) => {
1163 exclusive.conn_error = Some(ConnectionError::ShutdownByLocal);
1164 }
1165 (false, true) | (false, false) => {
1166 exclusive.conn_error = Some(ConnectionError::ShutdownByTransport(
1167 payload.connection_close_status,
1168 payload.connection_error_code,
1169 ));
1170 }
1171 }
1172 }
1173 exclusive
1174 .start_waiters
1175 .drain(..)
1176 .for_each(|waker| waker.wake());
1177 exclusive
1178 .read_waiters
1179 .drain(..)
1180 .for_each(|waker| waker.wake());
1181 }
1182 unsafe {
1183 Arc::from_raw(inner as *const _);
1184 }
1185 msquic::QUIC_STATUS_SUCCESS
1186 }
1187
1188 fn handle_event_ideal_send_buffer_size(
1189 _stream: msquic::Handle,
1190 inner: &Self,
1191 _payload: &msquic::StreamEventIdealSendBufferSize,
1192 ) -> u32 {
1193 trace!(
1194 "Stream({:p}, id={:?}) Ideal send buffer size",
1195 inner,
1196 inner.shared.id.read()
1197 );
1198 msquic::QUIC_STATUS_SUCCESS
1199 }
1200
1201 fn handle_event_peer_accepted(_stream: msquic::Handle, inner: &Self) -> u32 {
1202 trace!(
1203 "Stream({:p}, id={:?}) Peer accepted",
1204 inner,
1205 inner.shared.id.read()
1206 );
1207 let mut exclusive = inner.exclusive.lock().unwrap();
1208 exclusive.state = StreamState::StartComplete;
1209 if inner.shared.stream_type == StreamType::Bidirectional {
1210 exclusive.recv_state = StreamRecvState::StartComplete;
1211 }
1212 exclusive.send_state = StreamSendState::StartComplete;
1213 exclusive
1214 .start_waiters
1215 .drain(..)
1216 .for_each(|waker| waker.wake());
1217 msquic::QUIC_STATUS_SUCCESS
1218 }
1219
1220 extern "C" fn native_callback(
1221 stream: msquic::Handle,
1222 context: *mut c_void,
1223 event: &msquic::StreamEvent,
1224 ) -> u32 {
1225 let inner = unsafe { &*(context as *const Self) };
1226
1227 match event.event_type {
1228 msquic::STREAM_EVENT_START_COMPLETE => {
1229 Self::handle_event_start_complete(stream, inner, unsafe {
1230 &event.payload.start_complete
1231 })
1232 }
1233 msquic::STREAM_EVENT_RECEIVE => {
1234 Self::handle_event_receive(stream, inner, unsafe { &event.payload.receive })
1235 }
1236 msquic::STREAM_EVENT_SEND_COMPLETE => {
1237 Self::handle_event_send_complete(stream, inner, unsafe {
1238 &event.payload.send_complete
1239 })
1240 }
1241 msquic::STREAM_EVENT_PEER_SEND_SHUTDOWN => {
1242 Self::handle_event_peer_send_shutdown(stream, inner)
1243 }
1244 msquic::STREAM_EVENT_PEER_SEND_ABORTED => {
1245 Self::handle_event_peer_send_aborted(stream, inner, unsafe {
1246 &event.payload.peer_send_aborted
1247 })
1248 }
1249 msquic::STREAM_EVENT_PEER_RECEIVE_ABORTED => {
1250 Self::handle_event_peer_receive_aborted(stream, inner, unsafe {
1251 &event.payload.peer_receive_aborted
1252 })
1253 }
1254 msquic::STREAM_EVENT_SEND_SHUTDOWN_COMPLETE => {
1255 Self::handle_event_send_shutdown_complete(stream, inner, unsafe {
1256 &event.payload.send_shutdown_complete
1257 })
1258 }
1259 msquic::STREAM_EVENT_SHUTDOWN_COMPLETE => {
1260 Self::handle_event_shutdown_complete(stream, inner, unsafe {
1261 &event.payload.shutdown_complete
1262 })
1263 }
1264 msquic::STREAM_EVENT_IDEAL_SEND_BUFFER_SIZE => {
1265 Self::handle_event_ideal_send_buffer_size(stream, inner, unsafe {
1266 &event.payload.ideal_send_buffer_size
1267 })
1268 }
1269 msquic::STREAM_EVENT_PEER_ACCEPTED => Self::handle_event_peer_accepted(stream, inner),
1270 _ => {
1271 trace!("Stream({:p}) Other callback {}", inner, event.event_type);
1272 msquic::QUIC_STATUS_SUCCESS
1273 }
1274 }
1275 }
1276}
1277
1278impl Drop for StreamInner {
1279 fn drop(&mut self) {
1280 trace!("StreamInner({:p}) dropping", self);
1281 }
1282}
1283
1284impl fmt::Debug for StreamInnerExclusive {
1285 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1286 f.debug_struct("Exclusive")
1287 .field("state", &self.state)
1288 .field("recv_state", &self.recv_state)
1289 .field("send_state", &self.send_state)
1290 .finish()
1291 }
1292}
1293
1294impl fmt::Debug for StreamInnerShared {
1295 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1296 f.debug_struct("Shared")
1297 .field("type", &self.stream_type)
1298 .field("id", &self.id)
1299 .finish()
1300 }
1301}
1302pub struct ReadChunk<'a> {
1303 stream: &'a StreamInstance,
1304}
1305
1306impl Future for ReadChunk<'_> {
1307 type Output = Result<Option<StreamRecvBuffer>, ReadError>;
1308
1309 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1310 self.stream.poll_read_chunk(cx)
1311 }
1312}
1313
1314pub struct WriteChunk<'a> {
1315 stream: &'a StreamInstance,
1316 chunk: &'a Bytes,
1317 fin: bool,
1318}
1319
1320impl Future for WriteChunk<'_> {
1321 type Output = Result<usize, WriteError>;
1322
1323 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1324 self.stream.poll_write_chunk(cx, self.chunk, self.fin)
1325 }
1326}
1327
1328pub struct WriteChunks<'a> {
1329 stream: &'a StreamInstance,
1330 chunks: &'a [Bytes],
1331 fin: bool,
1332}
1333
1334impl Future for WriteChunks<'_> {
1335 type Output = Result<usize, WriteError>;
1336
1337 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1338 self.stream.poll_write_chunks(cx, self.chunks, self.fin)
1339 }
1340}
1341
1342#[cfg(feature = "tokio")]
1343impl tokio::io::AsyncRead for Stream {
1344 fn poll_read(
1345 self: Pin<&mut Self>,
1346 cx: &mut Context<'_>,
1347 buf: &mut tokio::io::ReadBuf<'_>,
1348 ) -> Poll<std::io::Result<()>> {
1349 let len = ready!(Self::poll_read(self.get_mut(), cx, buf.initialized_mut()))?;
1350 buf.set_filled(len);
1351 Poll::Ready(Ok(()))
1352 }
1353}
1354
1355#[cfg(feature = "tokio")]
1356impl tokio::io::AsyncWrite for Stream {
1357 fn poll_write(
1358 self: Pin<&mut Self>,
1359 cx: &mut Context<'_>,
1360 buf: &[u8],
1361 ) -> Poll<std::io::Result<usize>> {
1362 let len = ready!(Self::poll_write(self.get_mut(), cx, buf, false))?;
1363 Poll::Ready(Ok(len))
1364 }
1365
1366 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<std::io::Result<()>> {
1367 Poll::Ready(Ok(()))
1368 }
1369
1370 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<std::io::Result<()>> {
1371 ready!(Self::poll_finish_write(self.get_mut(), cx))?;
1372 Poll::Ready(Ok(()))
1373 }
1374}
1375
1376#[cfg(feature = "tokio")]
1377impl tokio::io::AsyncRead for ReadStream {
1378 fn poll_read(
1379 self: Pin<&mut Self>,
1380 cx: &mut Context<'_>,
1381 buf: &mut tokio::io::ReadBuf<'_>,
1382 ) -> Poll<std::io::Result<()>> {
1383 let len = ready!(Self::poll_read(self.get_mut(), cx, buf.initialized_mut()))?;
1384 buf.set_filled(len);
1385 Poll::Ready(Ok(()))
1386 }
1387}
1388
1389#[cfg(feature = "tokio")]
1390impl tokio::io::AsyncWrite for WriteStream {
1391 fn poll_write(
1392 self: Pin<&mut Self>,
1393 cx: &mut Context<'_>,
1394 buf: &[u8],
1395 ) -> Poll<std::io::Result<usize>> {
1396 let len = ready!(Self::poll_write(self.get_mut(), cx, buf, false))?;
1397 Poll::Ready(Ok(len))
1398 }
1399
1400 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<std::io::Result<()>> {
1401 Poll::Ready(Ok(()))
1402 }
1403
1404 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<std::io::Result<()>> {
1405 ready!(Self::poll_finish_write(self.get_mut(), cx))?;
1406 Poll::Ready(Ok(()))
1407 }
1408}
1409
1410impl futures_io::AsyncRead for Stream {
1411 fn poll_read(
1412 self: Pin<&mut Self>,
1413 cx: &mut Context<'_>,
1414 buf: &mut [u8],
1415 ) -> Poll<std::io::Result<usize>> {
1416 let len = ready!(Self::poll_read(self.get_mut(), cx, buf))?;
1417 Poll::Ready(Ok(len))
1418 }
1419}
1420
1421impl futures_io::AsyncWrite for Stream {
1422 fn poll_write(
1423 self: Pin<&mut Self>,
1424 cx: &mut Context<'_>,
1425 buf: &[u8],
1426 ) -> Poll<std::io::Result<usize>> {
1427 let len = ready!(Self::poll_write(self.get_mut(), cx, buf, false))?;
1428 Poll::Ready(Ok(len))
1429 }
1430
1431 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
1432 Poll::Ready(Ok(()))
1433 }
1434
1435 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
1436 ready!(Self::poll_finish_write(self.get_mut(), cx))?;
1437 Poll::Ready(Ok(()))
1438 }
1439}
1440
1441impl futures_io::AsyncRead for ReadStream {
1442 fn poll_read(
1443 self: Pin<&mut Self>,
1444 cx: &mut Context<'_>,
1445 buf: &mut [u8],
1446 ) -> Poll<std::io::Result<usize>> {
1447 let len = ready!(Self::poll_read(self.get_mut(), cx, buf))?;
1448 Poll::Ready(Ok(len))
1449 }
1450}
1451
1452impl futures_io::AsyncWrite for WriteStream {
1453 fn poll_write(
1454 self: Pin<&mut Self>,
1455 cx: &mut Context<'_>,
1456 buf: &[u8],
1457 ) -> Poll<std::io::Result<usize>> {
1458 let len = ready!(Self::poll_write(self.get_mut(), cx, buf, false))?;
1459 Poll::Ready(Ok(len))
1460 }
1461
1462 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
1463 Poll::Ready(Ok(()))
1464 }
1465
1466 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
1467 ready!(Self::poll_finish_write(self.get_mut(), cx))?;
1468 Poll::Ready(Ok(()))
1469 }
1470}
1471
1472enum ReadStatus<T> {
1473 Readable(T),
1474 Finished(Option<T>),
1475 Blocked(Option<T>),
1476}
1477
1478impl<T> From<(Option<T>, bool)> for ReadStatus<T> {
1479 fn from(status: (Option<T>, bool)) -> Self {
1480 match status {
1481 (read, true) => Self::Finished(read),
1482 (read, false) => Self::Blocked(read),
1483 }
1484 }
1485}
1486
1487enum WriteStatus<T> {
1488 Writable(T),
1489 Finished(Option<T>),
1490 Blocked(Option<T>),
1491}
1492
1493impl<T> From<(Option<T>, bool)> for WriteStatus<T> {
1494 fn from(status: (Option<T>, bool)) -> Self {
1495 match status {
1496 (write, true) => Self::Finished(write),
1497 (write, false) => Self::Blocked(write),
1498 }
1499 }
1500}
1501
1502#[derive(Debug, Error, Clone, PartialEq, Eq)]
1503pub enum StartError {
1504 #[error("connection not started yet")]
1505 ConnectionNotStarted,
1506 #[error("reach stream count limit")]
1507 LimitReached,
1508 #[error("connection lost")]
1509 ConnectionLost(#[from] ConnectionError),
1510 #[error("other error: status 0x{0:x}")]
1511 OtherError(u32),
1512}
1513
1514#[derive(Debug, Error, Clone, PartialEq, Eq)]
1515pub enum ReadError {
1516 #[error("stream not opened for reading")]
1517 Closed,
1518 #[error("stream reset by peer: error {0}")]
1519 Reset(u64),
1520 #[error("connection lost")]
1521 ConnectionLost(#[from] ConnectionError),
1522 #[error("other error: status 0x{0:x}")]
1523 OtherError(u32),
1524}
1525
1526impl From<ReadError> for std::io::Error {
1527 fn from(e: ReadError) -> Self {
1528 let kind = match e {
1529 ReadError::Closed => std::io::ErrorKind::NotConnected,
1530 ReadError::Reset(_) => std::io::ErrorKind::ConnectionReset,
1531 ReadError::ConnectionLost(ConnectionError::ConnectionClosed) => {
1532 std::io::ErrorKind::NotConnected
1533 }
1534 ReadError::ConnectionLost(_) => std::io::ErrorKind::ConnectionAborted,
1535 ReadError::OtherError(_) => std::io::ErrorKind::Other,
1536 };
1537 Self::new(kind, e)
1538 }
1539}
1540
1541#[derive(Debug, Error, Clone, PartialEq, Eq)]
1542pub enum WriteError {
1543 #[error("stream not opened for writing")]
1544 Closed,
1545 #[error("stream finished")]
1546 Finished,
1547 #[error("stream stopped by peer: error {0}")]
1548 Stopped(u64),
1549 #[error("connection lost")]
1550 ConnectionLost(#[from] ConnectionError),
1551 #[error("other error: status 0x{0:x}")]
1552 OtherError(u32),
1553}
1554
1555impl From<WriteError> for std::io::Error {
1556 fn from(e: WriteError) -> Self {
1557 let kind = match e {
1558 WriteError::Closed
1559 | WriteError::Finished
1560 | WriteError::ConnectionLost(ConnectionError::ConnectionClosed) => {
1561 std::io::ErrorKind::NotConnected
1562 }
1563 WriteError::Stopped(_) => std::io::ErrorKind::ConnectionReset,
1564 WriteError::ConnectionLost(_) => std::io::ErrorKind::ConnectionAborted,
1565 WriteError::OtherError(_) => std::io::ErrorKind::Other,
1566 };
1567 Self::new(kind, e)
1568 }
1569}