Skip to main content

msquic_async/
stream.rs

1use crate::buffer::{StreamRecvBuffer, WriteBuffer};
2use crate::connection::ConnectionError;
3
4#[cfg(feature = "msquic-2-5")]
5use msquic_v2_5 as msquic;
6#[cfg(feature = "msquic-seera")]
7use seera_msquic as msquic;
8
9use std::collections::VecDeque;
10use std::fmt;
11use std::future::Future;
12use std::pin::Pin;
13use std::sync::{Arc, Mutex, RwLock};
14use std::task::{ready, Context, Poll, Waker};
15
16use bytes::Bytes;
17use libc::c_void;
18use rangemap::RangeSet;
19use thiserror::Error;
20use tracing::trace;
21
22#[derive(Debug, Clone, Copy, PartialEq)]
23pub enum StreamType {
24    Bidirectional,
25    Unidirectional,
26}
27
28/// A stream represents a bidirectional or unidirectional stream.
29#[derive(Debug)]
30pub struct Stream(Arc<StreamInstance>);
31
32impl Stream {
33    pub(crate) fn open(
34        msquic_conn: &msquic::Connection,
35        stream_type: StreamType,
36    ) -> Result<Self, StartError> {
37        let flags = if stream_type == StreamType::Unidirectional {
38            msquic::StreamOpenFlags::UNIDIRECTIONAL
39        } else {
40            msquic::StreamOpenFlags::NONE
41        };
42        let inner = Arc::new(StreamInner::new(
43            stream_type,
44            StreamSendState::Closed,
45            StreamRecvState::Closed,
46            true,
47        ));
48        let inner_in_ev = inner.clone();
49        let msquic_stream = msquic::Stream::open(msquic_conn, flags, move |stream_ref, ev| {
50            inner_in_ev.callback_handler_impl(stream_ref, ev)
51        })
52        .map_err(StartError::OtherError)?;
53        let stream_handle = unsafe { msquic_stream.as_raw() };
54        let instance = Arc::new(StreamInstance {
55            inner,
56            msquic_stream,
57        });
58        trace!(
59            "StreamInstance({:p}, Inner: {:p}, HQUIC: {:p}) Open by local",
60            instance,
61            instance.inner,
62            stream_handle
63        );
64
65        Ok(Self(instance))
66    }
67
68    pub(crate) fn from_raw(handle: msquic::ffi::HQUIC, stream_type: StreamType) -> Self {
69        let msquic_stream = unsafe { msquic::Stream::from_raw(handle) };
70        let send_state = if stream_type == StreamType::Bidirectional {
71            StreamSendState::StartComplete
72        } else {
73            StreamSendState::Closed
74        };
75        let inner = Arc::new(StreamInner::new(
76            stream_type,
77            send_state,
78            StreamRecvState::StartComplete,
79            false,
80        ));
81        let inner_in_ev = inner.clone();
82        msquic_stream.set_callback_handler(move |stream_ref, ev| {
83            inner_in_ev.callback_handler_impl(stream_ref, ev)
84        });
85        let stream_handle = unsafe { msquic_stream.as_raw() };
86        let stream = Self(Arc::new(StreamInstance {
87            inner,
88            msquic_stream,
89        }));
90        trace!(
91            "StreamInstance({:p}, Inner: {:p}, HQUIC: {:p}, id: {:?}) Start by peer",
92            stream.0,
93            stream.0.inner,
94            stream_handle,
95            stream.id()
96        );
97        stream
98    }
99
100    pub(crate) fn poll_start(
101        &mut self,
102        cx: &mut Context,
103        failed_on_block: bool,
104    ) -> Poll<Result<(), StartError>> {
105        let mut exclusive = self.0.inner.exclusive.lock().unwrap();
106        trace!(
107            "Stream(Inner: {:p}) poll_start state={:?}",
108            self.0.inner,
109            exclusive.state
110        );
111        match exclusive.state {
112            StreamState::Open => {
113                let res = self
114                    .0
115                    .msquic_stream
116                    .start(
117                        msquic::StreamStartFlags::SHUTDOWN_ON_FAIL
118                            | msquic::StreamStartFlags::INDICATE_PEER_ACCEPT
119                            | if failed_on_block {
120                                msquic::StreamStartFlags::FAIL_BLOCKED
121                            } else {
122                                msquic::StreamStartFlags::NONE
123                            },
124                    )
125                    .map_err(StartError::OtherError);
126                trace!(
127                    "Stream(Inner: {:p}) poll_start start={:?}",
128                    self.0.inner,
129                    res
130                );
131                res?;
132                exclusive.state = StreamState::Start;
133                if self.0.inner.shared.stream_type == StreamType::Bidirectional {
134                    exclusive.recv_state = StreamRecvState::Start;
135                }
136                exclusive.send_state = StreamSendState::Start;
137            }
138            StreamState::Start => {}
139            _ => {
140                if let Some(start_status) = &exclusive.start_status {
141                    if start_status.is_ok() {
142                        return Poll::Ready(Ok(()));
143                    }
144                    return Poll::Ready(Err(match start_status.try_as_status_code().unwrap() {
145                        msquic::StatusCode::QUIC_STATUS_STREAM_LIMIT_REACHED => {
146                            StartError::LimitReached
147                        }
148                        msquic::StatusCode::QUIC_STATUS_ABORTED
149                        | msquic::StatusCode::QUIC_STATUS_INVALID_STATE => {
150                            StartError::ConnectionLost(
151                                exclusive.conn_error.as_ref().expect("conn_error").clone(),
152                            )
153                        }
154                        _ => StartError::OtherError(start_status.clone()),
155                    }));
156                } else {
157                    return Poll::Ready(Ok(()));
158                }
159            }
160        }
161        exclusive.start_waiters.push(cx.waker().clone());
162        Poll::Pending
163    }
164
165    /// Returns the stream ID.
166    pub fn id(&self) -> Option<u64> {
167        self.0.id()
168    }
169
170    /// Splits the stream into a read stream and a write stream.
171    pub fn split(self) -> (Option<ReadStream>, Option<WriteStream>) {
172        match (
173            self.0.inner.shared.stream_type,
174            self.0.inner.shared.local_open,
175        ) {
176            (StreamType::Unidirectional, true) => (None, Some(WriteStream(self.0))),
177            (StreamType::Unidirectional, false) => (Some(ReadStream(self.0)), None),
178            (StreamType::Bidirectional, _) => {
179                (Some(ReadStream(self.0.clone())), Some(WriteStream(self.0)))
180            }
181        }
182    }
183
184    /// Poll to read from the stream into buf.
185    pub fn poll_read(
186        &mut self,
187        cx: &mut Context<'_>,
188        buf: &mut [u8],
189    ) -> Poll<Result<usize, ReadError>> {
190        self.0.poll_read(cx, buf)
191    }
192
193    /// Poll to read the next segment of data.
194    pub fn poll_read_chunk(
195        &self,
196        cx: &mut Context<'_>,
197    ) -> Poll<Result<Option<StreamRecvBuffer>, ReadError>> {
198        self.0.poll_read_chunk(cx)
199    }
200
201    /// Read the next segment of data.
202    pub fn read_chunk(&self) -> ReadChunk<'_> {
203        self.0.read_chunk()
204    }
205
206    /// Poll to write to the stream from buf.
207    pub fn poll_write(
208        &mut self,
209        cx: &mut Context<'_>,
210        buf: &[u8],
211        fin: bool,
212    ) -> Poll<Result<usize, WriteError>> {
213        self.0.poll_write(cx, buf, fin)
214    }
215
216    /// Poll to write a bytes to the stream directly.
217    pub fn poll_write_chunk(
218        &mut self,
219        cx: &mut Context<'_>,
220        chunk: &Bytes,
221        fin: bool,
222    ) -> Poll<Result<usize, WriteError>> {
223        self.0.poll_write_chunk(cx, chunk, fin)
224    }
225
226    /// Write a bytes to the stream directly.
227    pub fn write_chunk<'a>(&'a mut self, chunk: &'a Bytes, fin: bool) -> WriteChunk<'a> {
228        self.0.write_chunk(chunk, fin)
229    }
230
231    /// Poll to write the list of bytes to the stream directly.
232    pub fn poll_write_chunks(
233        &mut self,
234        cx: &mut Context<'_>,
235        chunks: &[Bytes],
236        fin: bool,
237    ) -> Poll<Result<usize, WriteError>> {
238        self.0.poll_write_chunks(cx, chunks, fin)
239    }
240
241    /// Write the list of bytes to the stream directly.
242    pub fn write_chunks<'a>(&'a mut self, chunks: &'a [Bytes], fin: bool) -> WriteChunks<'a> {
243        self.0.write_chunks(chunks, fin)
244    }
245
246    /// Poll to finish writing to the stream.
247    pub fn poll_finish_write(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), WriteError>> {
248        self.0.poll_finish_write(cx)
249    }
250
251    /// Poll to abort writing to the stream.
252    pub fn poll_abort_write(
253        &mut self,
254        cx: &mut Context<'_>,
255        error_code: u64,
256    ) -> Poll<Result<(), WriteError>> {
257        self.0.poll_abort_write(cx, error_code)
258    }
259
260    /// Abort writing to the stream.
261    pub fn abort_write(&mut self, error_code: u64) -> Result<(), WriteError> {
262        self.0.abort_write(error_code)
263    }
264
265    /// Poll to abort reading from the stream.
266    pub fn poll_abort_read(
267        &mut self,
268        cx: &mut Context<'_>,
269        error_code: u64,
270    ) -> Poll<Result<(), ReadError>> {
271        self.0.poll_abort_read(cx, error_code)
272    }
273
274    /// Abort reading from the stream.
275    pub fn abort_read(&mut self, error_code: u64) -> Result<(), ReadError> {
276        self.0.abort_read(error_code)
277    }
278}
279
280/// A stream that can only be read from.
281#[derive(Debug)]
282pub struct ReadStream(Arc<StreamInstance>);
283
284impl ReadStream {
285    /// Returns the stream ID.
286    pub fn id(&self) -> Option<u64> {
287        self.0.id()
288    }
289
290    /// Poll to read from the stream into buf.
291    pub fn poll_read(
292        &mut self,
293        cx: &mut Context<'_>,
294        buf: &mut [u8],
295    ) -> Poll<Result<usize, ReadError>> {
296        self.0.poll_read(cx, buf)
297    }
298
299    /// Poll to read the next segment of data.
300    pub fn poll_read_chunk(
301        &self,
302        cx: &mut Context<'_>,
303    ) -> Poll<Result<Option<StreamRecvBuffer>, ReadError>> {
304        self.0.poll_read_chunk(cx)
305    }
306
307    /// Read the next segment of data.
308    pub fn read_chunk(&self) -> ReadChunk<'_> {
309        self.0.read_chunk()
310    }
311
312    /// Poll to abort reading from the stream.
313    pub fn poll_abort_read(
314        &mut self,
315        cx: &mut Context<'_>,
316        error_code: u64,
317    ) -> Poll<Result<(), ReadError>> {
318        self.0.poll_abort_read(cx, error_code)
319    }
320
321    /// Abort reading from the stream.
322    pub fn abort_read(&mut self, error_code: u64) -> Result<(), ReadError> {
323        self.0.abort_read(error_code)
324    }
325}
326
327/// A stream that can only be written to.
328#[derive(Debug)]
329pub struct WriteStream(Arc<StreamInstance>);
330
331impl WriteStream {
332    /// Returns the stream ID.
333    pub fn id(&self) -> Option<u64> {
334        self.0.id()
335    }
336
337    /// Poll to write to the stream from buf.
338    pub fn poll_write(
339        &mut self,
340        cx: &mut Context<'_>,
341        buf: &[u8],
342        fin: bool,
343    ) -> Poll<Result<usize, WriteError>> {
344        self.0.poll_write(cx, buf, fin)
345    }
346
347    /// Poll to write a bytes to the stream directly.
348    pub fn poll_write_chunk(
349        &mut self,
350        cx: &mut Context<'_>,
351        chunk: &Bytes,
352        fin: bool,
353    ) -> Poll<Result<usize, WriteError>> {
354        self.0.poll_write_chunk(cx, chunk, fin)
355    }
356
357    /// Write a bytes to the stream directly.
358    pub fn write_chunk<'a>(&'a mut self, chunk: &'a Bytes, fin: bool) -> WriteChunk<'a> {
359        self.0.write_chunk(chunk, fin)
360    }
361
362    /// Poll to write the list of bytes to the stream directly.
363    pub fn poll_write_chunks(
364        &mut self,
365        cx: &mut Context<'_>,
366        chunks: &[Bytes],
367        fin: bool,
368    ) -> Poll<Result<usize, WriteError>> {
369        self.0.poll_write_chunks(cx, chunks, fin)
370    }
371
372    /// Write the list of bytes to the stream directly.
373    pub fn write_chunks<'a>(&'a mut self, chunks: &'a [Bytes], fin: bool) -> WriteChunks<'a> {
374        self.0.write_chunks(chunks, fin)
375    }
376
377    /// Poll to finish writing to the stream.
378    pub fn poll_finish_write(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), WriteError>> {
379        self.0.poll_finish_write(cx)
380    }
381
382    /// Poll to abort writing to the stream.
383    pub fn poll_abort_write(
384        &mut self,
385        cx: &mut Context<'_>,
386        error_code: u64,
387    ) -> Poll<Result<(), WriteError>> {
388        self.0.poll_abort_write(cx, error_code)
389    }
390
391    /// Abort writing to the stream.
392    pub fn abort_write(&mut self, error_code: u64) -> Result<(), WriteError> {
393        self.0.abort_write(error_code)
394    }
395}
396
397#[derive(Debug)]
398pub(crate) struct StreamInstance {
399    inner: Arc<StreamInner>,
400    msquic_stream: msquic::Stream,
401}
402
403impl StreamInstance {
404    pub(crate) fn id(&self) -> Option<u64> {
405        let id = { *self.inner.shared.id.read().unwrap() };
406        if id.is_some() {
407            id
408        } else {
409            let res = unsafe {
410                msquic::Api::get_param_auto::<u64>(
411                    self.msquic_stream.as_raw(),
412                    msquic::PARAM_STREAM_ID,
413                )
414            };
415            if let Ok(id) = res {
416                self.inner.shared.id.write().unwrap().replace(id);
417                Some(id)
418            } else {
419                None
420            }
421        }
422    }
423
424    fn poll_read(
425        self: &Arc<Self>,
426        cx: &mut Context<'_>,
427        buf: &mut [u8],
428    ) -> Poll<Result<usize, ReadError>> {
429        self.poll_read_generic(cx, |recv_buffers, read_complete_buffers| {
430            let mut read = 0;
431            let mut fin = false;
432            loop {
433                if read == buf.len() {
434                    return ReadStatus::Readable(read);
435                }
436
437                match recv_buffers
438                    .front_mut()
439                    .and_then(|x| x.get_bytes_upto_size(buf.len() - read))
440                {
441                    Some(slice) => {
442                        let len = slice.len();
443                        buf[read..read + len].copy_from_slice(slice);
444                        read += len;
445                    }
446                    None => {
447                        if let Some(mut recv_buffer) = recv_buffers.pop_front() {
448                            recv_buffer.set_stream(self.clone());
449                            fin = recv_buffer.fin();
450                            read_complete_buffers.push(recv_buffer);
451                            continue;
452                        } else {
453                            return (if read > 0 { Some(read) } else { None }, fin).into();
454                        }
455                    }
456                }
457            }
458        })
459        .map(|res| res.map(|x| x.unwrap_or(0)))
460    }
461
462    fn poll_read_chunk(
463        self: &Arc<Self>,
464        cx: &mut Context<'_>,
465    ) -> Poll<Result<Option<StreamRecvBuffer>, ReadError>> {
466        self.poll_read_generic(cx, |recv_buffers, _| {
467            recv_buffers
468                .pop_front()
469                .map(|mut recv_buffer| {
470                    let fin = recv_buffer.fin();
471                    recv_buffer.set_stream(self.clone());
472                    (Some(recv_buffer), fin)
473                })
474                .unwrap_or((None, false))
475                .into()
476        })
477    }
478
479    fn read_chunk(self: &Arc<Self>) -> ReadChunk<'_> {
480        ReadChunk { stream: self }
481    }
482
483    fn poll_read_generic<T, U>(
484        &self,
485        cx: &mut Context<'_>,
486        mut read_fn: T,
487    ) -> Poll<Result<Option<U>, ReadError>>
488    where
489        T: FnMut(&mut VecDeque<StreamRecvBuffer>, &mut Vec<StreamRecvBuffer>) -> ReadStatus<U>,
490    {
491        let res;
492        let mut read_complete_buffers = Vec::new();
493        {
494            let mut exclusive = self.inner.exclusive.lock().unwrap();
495            match exclusive.recv_state {
496                StreamRecvState::Closed => {
497                    return Poll::Ready(Err(ReadError::Closed));
498                }
499                StreamRecvState::Start => {
500                    exclusive.start_waiters.push(cx.waker().clone());
501                    return Poll::Pending;
502                }
503                StreamRecvState::StartComplete => {}
504                StreamRecvState::Shutdown => {
505                    return Poll::Ready(Ok(None));
506                }
507                StreamRecvState::ShutdownComplete => {
508                    if let Some(conn_error) = &exclusive.conn_error {
509                        return Poll::Ready(Err(ReadError::ConnectionLost(conn_error.clone())));
510                    } else if let Some(error_code) = &exclusive.recv_error_code {
511                        return Poll::Ready(Err(ReadError::Reset(*error_code)));
512                    } else {
513                        return Poll::Ready(Ok(None));
514                    }
515                }
516            }
517
518            let status = read_fn(&mut exclusive.recv_buffers, &mut read_complete_buffers);
519
520            res = match status {
521                ReadStatus::Readable(read) | ReadStatus::Blocked(Some(read)) => {
522                    Poll::Ready(Ok(Some(read)))
523                }
524                ReadStatus::Finished(read) => {
525                    exclusive.recv_state = StreamRecvState::Shutdown;
526                    Poll::Ready(Ok(read))
527                }
528                ReadStatus::Blocked(None) => {
529                    exclusive.read_waiters.push(cx.waker().clone());
530                    Poll::Pending
531                }
532            };
533        }
534        res
535    }
536
537    fn poll_write(
538        &self,
539        cx: &mut Context<'_>,
540        buf: &[u8],
541        fin: bool,
542    ) -> Poll<Result<usize, WriteError>> {
543        self.poll_write_generic(cx, |write_buf| {
544            let written = write_buf.put_slice(buf);
545            if written == buf.len() && !fin {
546                WriteStatus::Writable(written)
547            } else {
548                (Some(written), fin).into()
549            }
550        })
551        .map(|res| res.map(|x| x.unwrap_or(0)))
552    }
553
554    fn poll_write_chunk(
555        &self,
556        cx: &mut Context<'_>,
557        chunk: &Bytes,
558        fin: bool,
559    ) -> Poll<Result<usize, WriteError>> {
560        self.poll_write_generic(cx, |write_buf| {
561            let written = write_buf.put_zerocopy(chunk);
562            if written == chunk.len() && !fin {
563                WriteStatus::Writable(written)
564            } else {
565                (Some(written), fin).into()
566            }
567        })
568        .map(|res| res.map(|x| x.unwrap_or(0)))
569    }
570
571    fn write_chunk<'a>(&'a self, chunk: &'a Bytes, fin: bool) -> WriteChunk<'a> {
572        WriteChunk {
573            stream: self,
574            chunk,
575            fin,
576        }
577    }
578
579    fn poll_write_chunks(
580        &self,
581        cx: &mut Context<'_>,
582        chunks: &[Bytes],
583        fin: bool,
584    ) -> Poll<Result<usize, WriteError>> {
585        self.poll_write_generic(cx, |write_buf| {
586            let (mut total_len, mut total_written) = (0, 0);
587            for buf in chunks {
588                total_len += buf.len();
589                total_written += write_buf.put_zerocopy(buf);
590            }
591            if total_written == total_len && !fin {
592                WriteStatus::Writable(total_written)
593            } else {
594                (Some(total_written), fin).into()
595            }
596        })
597        .map(|res| res.map(|x| x.unwrap_or(0)))
598    }
599
600    fn write_chunks<'a>(&'a self, chunks: &'a [Bytes], fin: bool) -> WriteChunks<'a> {
601        WriteChunks {
602            stream: self,
603            chunks,
604            fin,
605        }
606    }
607
608    fn poll_write_generic<T, U>(
609        &self,
610        _cx: &mut Context<'_>,
611        mut write_fn: T,
612    ) -> Poll<Result<Option<U>, WriteError>>
613    where
614        T: FnMut(&mut WriteBuffer) -> WriteStatus<U>,
615    {
616        let mut exclusive = self.inner.exclusive.lock().unwrap();
617        match exclusive.send_state {
618            StreamSendState::Closed => {
619                return Poll::Ready(Err(WriteError::Closed));
620            }
621            StreamSendState::Start => {
622                exclusive.start_waiters.push(_cx.waker().clone());
623                return Poll::Pending;
624            }
625            StreamSendState::StartComplete => {}
626            StreamSendState::Shutdown => {
627                return Poll::Ready(Err(WriteError::Finished));
628            }
629            StreamSendState::ShutdownComplete => {
630                if let Some(conn_error) = &exclusive.conn_error {
631                    return Poll::Ready(Err(WriteError::ConnectionLost(conn_error.clone())));
632                } else if let Some(error_code) = &exclusive.send_error_code {
633                    return Poll::Ready(Err(WriteError::Stopped(*error_code)));
634                } else {
635                    return Poll::Ready(Err(WriteError::Finished));
636                }
637            }
638        }
639        let mut write_buf = exclusive.write_pool.pop().unwrap_or(WriteBuffer::new());
640        let status = write_fn(&mut write_buf);
641        let buffers = unsafe {
642            let (data, len) = write_buf.get_buffers();
643            std::slice::from_raw_parts(data, len)
644        };
645        match status {
646            WriteStatus::Writable(val) | WriteStatus::Blocked(Some(val)) => {
647                match unsafe {
648                    self.msquic_stream.send(
649                        buffers,
650                        msquic::SendFlags::NONE,
651                        write_buf.into_raw() as *const _,
652                    )
653                }
654                .map_err(WriteError::OtherError)
655                {
656                    Ok(()) => Poll::Ready(Ok(Some(val))),
657                    Err(e) => Poll::Ready(Err(e)),
658                }
659            }
660            WriteStatus::Blocked(None) => unreachable!(),
661            WriteStatus::Finished(val) => {
662                match unsafe {
663                    self.msquic_stream.send(
664                        buffers,
665                        msquic::SendFlags::FIN,
666                        write_buf.into_raw() as *const _,
667                    )
668                }
669                .map_err(WriteError::OtherError)
670                {
671                    Ok(()) => {
672                        exclusive.send_state = StreamSendState::Shutdown;
673                        Poll::Ready(Ok(val))
674                    }
675                    Err(e) => Poll::Ready(Err(e)),
676                }
677            }
678        }
679    }
680
681    fn poll_finish_write(&self, cx: &mut Context<'_>) -> Poll<Result<(), WriteError>> {
682        let mut exclusive = self.inner.exclusive.lock().unwrap();
683        match exclusive.send_state {
684            StreamSendState::Start => {
685                exclusive.start_waiters.push(cx.waker().clone());
686                return Poll::Pending;
687            }
688            StreamSendState::StartComplete => {
689                match self
690                    .msquic_stream
691                    .shutdown(msquic::StreamShutdownFlags::GRACEFUL, 0)
692                    .map_err(WriteError::OtherError)
693                {
694                    Ok(()) => {
695                        exclusive.send_state = StreamSendState::Shutdown;
696                    }
697                    Err(e) => return Poll::Ready(Err(e)),
698                }
699            }
700            StreamSendState::Shutdown => {}
701            StreamSendState::ShutdownComplete => {
702                if let Some(conn_error) = &exclusive.conn_error {
703                    return Poll::Ready(Err(WriteError::ConnectionLost(conn_error.clone())));
704                } else if let Some(error_code) = &exclusive.send_error_code {
705                    return Poll::Ready(Err(WriteError::Stopped(*error_code)));
706                } else {
707                    return Poll::Ready(Ok(()));
708                }
709            }
710            _ => {
711                return Poll::Ready(Err(WriteError::Closed));
712            }
713        }
714        exclusive.write_shutdown_waiters.push(cx.waker().clone());
715        Poll::Pending
716    }
717
718    fn poll_abort_write(
719        &self,
720        cx: &mut Context<'_>,
721        error_code: u64,
722    ) -> Poll<Result<(), WriteError>> {
723        let mut exclusive = self.inner.exclusive.lock().unwrap();
724        match exclusive.send_state {
725            StreamSendState::Start => {
726                exclusive.start_waiters.push(cx.waker().clone());
727                return Poll::Pending;
728            }
729            StreamSendState::StartComplete => {
730                match self
731                    .msquic_stream
732                    .shutdown(msquic::StreamShutdownFlags::ABORT_SEND, error_code)
733                    .map_err(WriteError::OtherError)
734                {
735                    Ok(()) => {
736                        exclusive.send_state = StreamSendState::Shutdown;
737                    }
738                    Err(e) => return Poll::Ready(Err(e)),
739                }
740            }
741            StreamSendState::Shutdown => {}
742            StreamSendState::ShutdownComplete => {
743                if let Some(conn_error) = &exclusive.conn_error {
744                    return Poll::Ready(Err(WriteError::ConnectionLost(conn_error.clone())));
745                } else if let Some(error_code) = &exclusive.send_error_code {
746                    return Poll::Ready(Err(WriteError::Stopped(*error_code)));
747                } else {
748                    return Poll::Ready(Ok(()));
749                }
750            }
751            _ => {
752                return Poll::Ready(Err(WriteError::Closed));
753            }
754        }
755        exclusive.write_shutdown_waiters.push(cx.waker().clone());
756        Poll::Pending
757    }
758
759    fn abort_write(&self, error_code: u64) -> Result<(), WriteError> {
760        let mut exclusive = self.inner.exclusive.lock().unwrap();
761        match exclusive.send_state {
762            StreamSendState::StartComplete => {
763                self.msquic_stream
764                    .shutdown(msquic::StreamShutdownFlags::ABORT_SEND, error_code)
765                    .map_err(WriteError::OtherError)?;
766                exclusive.send_state = StreamSendState::Shutdown;
767                Ok(())
768            }
769            _ => Err(WriteError::Closed),
770        }
771    }
772
773    fn poll_abort_read(
774        &self,
775        cx: &mut Context<'_>,
776        error_code: u64,
777    ) -> Poll<Result<(), ReadError>> {
778        let mut exclusive = self.inner.exclusive.lock().unwrap();
779        match exclusive.recv_state {
780            StreamRecvState::Start => {
781                exclusive.start_waiters.push(cx.waker().clone());
782                Poll::Pending
783            }
784            StreamRecvState::StartComplete => {
785                match self
786                    .msquic_stream
787                    .shutdown(msquic::StreamShutdownFlags::ABORT_RECEIVE, error_code)
788                    .map_err(ReadError::OtherError)
789                {
790                    Ok(()) => {
791                        exclusive.recv_state = StreamRecvState::ShutdownComplete;
792                        exclusive
793                            .read_waiters
794                            .drain(..)
795                            .for_each(|waker| waker.wake());
796                        Poll::Ready(Ok(()))
797                    }
798                    Err(e) => Poll::Ready(Err(e)),
799                }
800            }
801            StreamRecvState::ShutdownComplete => {
802                if let Some(conn_error) = &exclusive.conn_error {
803                    Poll::Ready(Err(ReadError::ConnectionLost(conn_error.clone())))
804                } else if let Some(error_code) = &exclusive.recv_error_code {
805                    Poll::Ready(Err(ReadError::Reset(*error_code)))
806                } else {
807                    Poll::Ready(Ok(()))
808                }
809            }
810            _ => Poll::Ready(Err(ReadError::Closed)),
811        }
812    }
813
814    fn abort_read(&self, error_code: u64) -> Result<(), ReadError> {
815        let mut exclusive = self.inner.exclusive.lock().unwrap();
816        match exclusive.recv_state {
817            StreamRecvState::StartComplete => {
818                self.msquic_stream
819                    .shutdown(msquic::StreamShutdownFlags::ABORT_RECEIVE, error_code)
820                    .map_err(ReadError::OtherError)?;
821                exclusive.recv_state = StreamRecvState::ShutdownComplete;
822            }
823            _ => {
824                return Err(ReadError::Closed);
825            }
826        }
827        Ok(())
828    }
829
830    pub(crate) fn read_complete(&self, buffer: &StreamRecvBuffer) {
831        let buffer_range = buffer.range();
832        trace!(
833            "StreamInstance({:p}) read complete offset={} len={}",
834            self,
835            buffer_range.start,
836            buffer_range.end - buffer_range.start
837        );
838
839        let mut exclusive = self.inner.exclusive.lock().unwrap();
840        if !buffer_range.is_empty() {
841            exclusive.read_complete_map.insert(buffer_range);
842        }
843        let complete_len = if let Some(complete_range) = exclusive.read_complete_map.first() {
844            trace!(
845                "StreamInstance({:p}) complete read offset={} len={}",
846                self,
847                complete_range.start,
848                complete_range.end - complete_range.start
849            );
850
851            if complete_range.start == 0 && exclusive.read_complete_cursor < complete_range.end {
852                let complete_len = complete_range.end - exclusive.read_complete_cursor;
853                exclusive.read_complete_cursor = complete_range.end;
854                Some(complete_len)
855            } else if complete_range.start == 0
856                && exclusive.read_complete_cursor == complete_range.end
857                && buffer.offset() == complete_range.end
858                && buffer.is_empty()
859                && buffer.fin()
860            {
861                Some(0)
862            } else {
863                None
864            }
865        } else if buffer.is_empty() && buffer.fin() {
866            Some(0)
867        } else {
868            None
869        };
870        if let Some(complete_len) = complete_len {
871            trace!(
872                "StreamInstance({:p}) call receive_complete len={}",
873                self,
874                complete_len
875            );
876            self.msquic_stream.receive_complete(complete_len as u64);
877        }
878    }
879}
880
881impl Drop for StreamInstance {
882    fn drop(&mut self) {
883        trace!("StreamInstance({:p}) dropping", self);
884        let exclusive = self.inner.exclusive.lock().unwrap();
885        match exclusive.state {
886            StreamState::Start | StreamState::StartComplete => {
887                trace!(
888                    "StreamInstance(Inner: {:p}) shutdown while dropping",
889                    self.inner
890                );
891                // let _ = self.msquic_stream.shutdown(
892                //     msquic::StreamShutdownFlags::ABORT_SEND
893                //         | msquic::StreamShutdownFlags::ABORT_RECEIVE
894                //         | msquic::StreamShutdownFlags::IMMEDIATE,
895                //     0,
896                // );
897            }
898            _ => {}
899        }
900    }
901}
902
903#[derive(Debug)]
904struct StreamInner {
905    exclusive: Mutex<StreamInnerExclusive>,
906    pub(crate) shared: StreamInnerShared,
907}
908
909struct StreamInnerExclusive {
910    state: StreamState,
911    start_status: Option<msquic::Status>,
912    recv_state: StreamRecvState,
913    recv_buffers: VecDeque<StreamRecvBuffer>,
914    recv_len: usize,
915    read_complete_map: RangeSet<usize>,
916    read_complete_cursor: usize,
917    send_state: StreamSendState,
918    write_pool: Vec<WriteBuffer>,
919    recv_error_code: Option<u64>,
920    send_error_code: Option<u64>,
921    conn_error: Option<ConnectionError>,
922    start_waiters: Vec<Waker>,
923    read_waiters: Vec<Waker>,
924    write_shutdown_waiters: Vec<Waker>,
925}
926
927struct StreamInnerShared {
928    stream_type: StreamType,
929    local_open: bool,
930    id: RwLock<Option<u64>>,
931}
932
933#[derive(Debug, PartialEq)]
934enum StreamState {
935    Open,
936    Start,
937    StartComplete,
938    ShutdownComplete,
939}
940
941#[derive(Debug, PartialEq)]
942enum StreamRecvState {
943    Closed,
944    Start,
945    StartComplete,
946    Shutdown,
947    ShutdownComplete,
948}
949
950#[derive(Debug, PartialEq)]
951enum StreamSendState {
952    Closed,
953    Start,
954    StartComplete,
955    Shutdown,
956    ShutdownComplete,
957}
958
959impl StreamInner {
960    fn new(
961        stream_type: StreamType,
962        send_state: StreamSendState,
963        recv_state: StreamRecvState,
964        local_open: bool,
965    ) -> Self {
966        Self {
967            exclusive: Mutex::new(StreamInnerExclusive {
968                state: StreamState::Open,
969                start_status: None,
970                recv_state,
971                recv_buffers: VecDeque::new(),
972                recv_len: 0,
973                read_complete_map: RangeSet::new(),
974                read_complete_cursor: 0,
975                send_state,
976                write_pool: Vec::new(),
977                recv_error_code: None,
978                send_error_code: None,
979                conn_error: None,
980                start_waiters: Vec::new(),
981                read_waiters: Vec::new(),
982                write_shutdown_waiters: Vec::new(),
983            }),
984            shared: StreamInnerShared {
985                local_open,
986                id: RwLock::new(None),
987                stream_type,
988            },
989        }
990    }
991
992    fn handle_event_start_complete(
993        &self,
994        status: msquic::Status,
995        id: u64,
996        peer_accepted: bool,
997    ) -> Result<(), msquic::Status> {
998        if status.is_ok() {
999            self.shared.id.write().unwrap().replace(id);
1000        }
1001        trace!(
1002            "StreamInner({:p}, id={:?}) start complete status={:?}, peer_accepted={}, id={}",
1003            self,
1004            self.shared.id.read(),
1005            status,
1006            peer_accepted,
1007            id,
1008        );
1009        let mut exclusive = self.exclusive.lock().unwrap();
1010        exclusive.start_status = Some(status.clone());
1011        if status.is_ok() && peer_accepted {
1012            exclusive.state = StreamState::StartComplete;
1013            if self.shared.stream_type == StreamType::Bidirectional {
1014                exclusive.recv_state = StreamRecvState::StartComplete;
1015            }
1016            exclusive.send_state = StreamSendState::StartComplete;
1017        }
1018
1019        if status.0 == msquic::StatusCode::QUIC_STATUS_STREAM_LIMIT_REACHED.into() || peer_accepted
1020        {
1021            exclusive
1022                .start_waiters
1023                .drain(..)
1024                .for_each(|waker| waker.wake());
1025        }
1026        Ok(())
1027    }
1028
1029    fn handle_event_receive(
1030        &self,
1031        absolute_offset: u64,
1032        total_buffer_length: &mut u64,
1033        buffers: &[msquic::BufferRef],
1034        flags: msquic::ReceiveFlags,
1035    ) -> Result<(), msquic::Status> {
1036        trace!(
1037            "StreamInner({:p}, id={:?}) Receive {} offsets {} bytes, fin {}",
1038            self,
1039            self.shared.id.read(),
1040            absolute_offset,
1041            total_buffer_length,
1042            (flags & msquic::ReceiveFlags::FIN) == msquic::ReceiveFlags::FIN
1043        );
1044
1045        let recv_buffer = StreamRecvBuffer::new(
1046            absolute_offset as usize,
1047            buffers,
1048            (flags & msquic::ReceiveFlags::FIN) == msquic::ReceiveFlags::FIN,
1049        );
1050
1051        let mut exclusive = self.exclusive.lock().unwrap();
1052        exclusive.recv_len += *total_buffer_length as usize;
1053        exclusive.recv_buffers.push_back(recv_buffer);
1054        exclusive
1055            .read_waiters
1056            .drain(..)
1057            .for_each(|waker| waker.wake());
1058        Err(msquic::StatusCode::QUIC_STATUS_PENDING.into())
1059    }
1060
1061    fn handle_event_send_complete(
1062        &self,
1063        _canceled: bool,
1064        client_context: *const c_void,
1065    ) -> Result<(), msquic::Status> {
1066        trace!(
1067            "StreamInner({:p}, id={:?}) Send complete",
1068            self,
1069            self.shared.id.read()
1070        );
1071
1072        let mut write_buf = unsafe { WriteBuffer::from_raw(client_context) };
1073        let mut exclusive = self.exclusive.lock().unwrap();
1074        write_buf.reset();
1075        exclusive.write_pool.push(write_buf);
1076        Ok(())
1077    }
1078
1079    fn handle_event_peer_send_shutdown(&self) -> Result<(), msquic::Status> {
1080        trace!(
1081            "StreamInner({:p}, id={:?}) Peer send shutdown",
1082            self,
1083            self.shared.id.read()
1084        );
1085        let mut exclusive = self.exclusive.lock().unwrap();
1086        exclusive.recv_state = StreamRecvState::ShutdownComplete;
1087        exclusive
1088            .read_waiters
1089            .drain(..)
1090            .for_each(|waker| waker.wake());
1091        Ok(())
1092    }
1093
1094    fn handle_event_peer_send_aborted(&self, error_code: u64) -> Result<(), msquic::Status> {
1095        trace!(
1096            "StreamInner({:p}, id={:?}) Peer send aborted",
1097            self,
1098            self.shared.id.read()
1099        );
1100        let mut exclusive = self.exclusive.lock().unwrap();
1101        exclusive.recv_state = StreamRecvState::ShutdownComplete;
1102        exclusive.recv_error_code = Some(error_code);
1103        exclusive
1104            .read_waiters
1105            .drain(..)
1106            .for_each(|waker| waker.wake());
1107        Ok(())
1108    }
1109
1110    fn handle_event_peer_receive_aborted(&self, error_code: u64) -> Result<(), msquic::Status> {
1111        trace!(
1112            "StreamInner({:p}, id={:?}) Peer receive aborted",
1113            self,
1114            self.shared.id.read()
1115        );
1116        let mut exclusive = self.exclusive.lock().unwrap();
1117        exclusive.send_state = StreamSendState::ShutdownComplete;
1118        exclusive.send_error_code = Some(error_code);
1119        exclusive
1120            .write_shutdown_waiters
1121            .drain(..)
1122            .for_each(|waker| waker.wake());
1123        Ok(())
1124    }
1125
1126    fn handle_event_send_shutdown_complete(&self, _graceful: bool) -> Result<(), msquic::Status> {
1127        trace!(
1128            "StreamInner({:p}, id={:?}) Send shutdown complete",
1129            self,
1130            self.shared.id.read()
1131        );
1132        let mut exclusive = self.exclusive.lock().unwrap();
1133        exclusive.send_state = StreamSendState::ShutdownComplete;
1134        exclusive
1135            .write_shutdown_waiters
1136            .drain(..)
1137            .for_each(|waker| waker.wake());
1138        Ok(())
1139    }
1140
1141    #[allow(clippy::too_many_arguments)]
1142    fn handle_event_shutdown_complete(
1143        &self,
1144        msquic_stream: msquic::StreamRef,
1145        connection_shutdown: bool,
1146        app_close_in_progress: bool,
1147        connection_shutdown_by_app: bool,
1148        connection_closed_remotely: bool,
1149        connection_error_code: u64,
1150        connection_close_status: msquic::Status,
1151    ) -> Result<(), msquic::Status> {
1152        trace!(
1153            "StreamInner({:p}, id={:?}) Shutdown complete",
1154            self,
1155            self.shared.id.read()
1156        );
1157        {
1158            let mut exclusive = self.exclusive.lock().unwrap();
1159
1160            if !exclusive.recv_buffers.is_empty() {
1161                trace!(
1162                    "StreamInner({:p}) read complete {}",
1163                    self,
1164                    exclusive.recv_len - exclusive.read_complete_cursor
1165                );
1166                exclusive.recv_buffers.clear();
1167                if !app_close_in_progress {
1168                    msquic_stream.receive_complete(
1169                        (exclusive.recv_len - exclusive.read_complete_cursor) as u64,
1170                    );
1171                }
1172            }
1173
1174            exclusive.state = StreamState::ShutdownComplete;
1175            exclusive.recv_state = StreamRecvState::ShutdownComplete;
1176            exclusive.send_state = StreamSendState::ShutdownComplete;
1177            if connection_shutdown {
1178                match (connection_shutdown_by_app, connection_closed_remotely) {
1179                    (true, true) => {
1180                        exclusive.conn_error =
1181                            Some(ConnectionError::ShutdownByPeer(connection_error_code));
1182                    }
1183                    (true, false) => {
1184                        exclusive.conn_error = Some(ConnectionError::ShutdownByLocal);
1185                    }
1186                    (false, true) | (false, false) => {
1187                        exclusive.conn_error = Some(ConnectionError::ShutdownByTransport(
1188                            connection_close_status,
1189                            connection_error_code,
1190                        ));
1191                    }
1192                }
1193            }
1194            exclusive
1195                .start_waiters
1196                .drain(..)
1197                .for_each(|waker| waker.wake());
1198            exclusive
1199                .read_waiters
1200                .drain(..)
1201                .for_each(|waker| waker.wake());
1202        }
1203        // unsafe {
1204        //     Arc::from_raw(self as *const _);
1205        // }
1206        Ok(())
1207    }
1208
1209    fn handle_event_ideal_send_buffer_size(&self, _byte_count: u64) -> Result<(), msquic::Status> {
1210        trace!(
1211            "StreamInner({:p}, id={:?}) Ideal send buffer size",
1212            self,
1213            self.shared.id.read()
1214        );
1215        Ok(())
1216    }
1217
1218    fn handle_event_peer_accepted(&self) -> Result<(), msquic::Status> {
1219        trace!(
1220            "StreamInner({:p}, id={:?}) Peer accepted",
1221            self,
1222            self.shared.id.read()
1223        );
1224        let mut exclusive = self.exclusive.lock().unwrap();
1225        exclusive.state = StreamState::StartComplete;
1226        if self.shared.stream_type == StreamType::Bidirectional {
1227            exclusive.recv_state = StreamRecvState::StartComplete;
1228        }
1229        exclusive.send_state = StreamSendState::StartComplete;
1230        exclusive
1231            .start_waiters
1232            .drain(..)
1233            .for_each(|waker| waker.wake());
1234        Ok(())
1235    }
1236
1237    fn callback_handler_impl(
1238        &self,
1239        msquic_stream: msquic::StreamRef,
1240        ev: msquic::StreamEvent,
1241    ) -> Result<(), msquic::Status> {
1242        match ev {
1243            msquic::StreamEvent::StartComplete {
1244                status,
1245                id,
1246                peer_accepted,
1247            } => self.handle_event_start_complete(status, id, peer_accepted),
1248            msquic::StreamEvent::Receive {
1249                absolute_offset,
1250                total_buffer_length,
1251                buffers,
1252                flags,
1253            } => self.handle_event_receive(absolute_offset, total_buffer_length, buffers, flags),
1254            msquic::StreamEvent::SendComplete {
1255                cancelled,
1256                client_context,
1257            } => self.handle_event_send_complete(cancelled, client_context),
1258            msquic::StreamEvent::PeerSendShutdown => self.handle_event_peer_send_shutdown(),
1259            msquic::StreamEvent::PeerSendAborted { error_code } => {
1260                self.handle_event_peer_send_aborted(error_code)
1261            }
1262            msquic::StreamEvent::PeerReceiveAborted { error_code } => {
1263                self.handle_event_peer_receive_aborted(error_code)
1264            }
1265            msquic::StreamEvent::SendShutdownComplete { graceful } => {
1266                self.handle_event_send_shutdown_complete(graceful)
1267            }
1268            msquic::StreamEvent::ShutdownComplete {
1269                connection_shutdown,
1270                app_close_in_progress,
1271                connection_shutdown_by_app,
1272                connection_closed_remotely,
1273                connection_error_code,
1274                connection_close_status,
1275            } => self.handle_event_shutdown_complete(
1276                msquic_stream,
1277                connection_shutdown,
1278                app_close_in_progress,
1279                connection_shutdown_by_app,
1280                connection_closed_remotely,
1281                connection_error_code,
1282                connection_close_status,
1283            ),
1284            msquic::StreamEvent::IdealSendBufferSize { byte_count } => {
1285                self.handle_event_ideal_send_buffer_size(byte_count)
1286            }
1287            msquic::StreamEvent::PeerAccepted => self.handle_event_peer_accepted(),
1288            _ => {
1289                trace!("StreamInner({:p}) Other callback", self);
1290                Ok(())
1291            }
1292        }
1293    }
1294}
1295
1296impl Drop for StreamInner {
1297    fn drop(&mut self) {
1298        trace!("StreamInner({:p}) dropping", self);
1299    }
1300}
1301
1302impl fmt::Debug for StreamInnerExclusive {
1303    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1304        f.debug_struct("Exclusive")
1305            .field("state", &self.state)
1306            .field("recv_state", &self.recv_state)
1307            .field("send_state", &self.send_state)
1308            .finish()
1309    }
1310}
1311
1312impl fmt::Debug for StreamInnerShared {
1313    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1314        f.debug_struct("Shared")
1315            .field("type", &self.stream_type)
1316            .field("id", &self.id)
1317            .finish()
1318    }
1319}
1320pub struct ReadChunk<'a> {
1321    stream: &'a Arc<StreamInstance>,
1322}
1323
1324impl Future for ReadChunk<'_> {
1325    type Output = Result<Option<StreamRecvBuffer>, ReadError>;
1326
1327    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1328        self.stream.poll_read_chunk(cx)
1329    }
1330}
1331
1332pub struct WriteChunk<'a> {
1333    stream: &'a StreamInstance,
1334    chunk: &'a Bytes,
1335    fin: bool,
1336}
1337
1338impl Future for WriteChunk<'_> {
1339    type Output = Result<usize, WriteError>;
1340
1341    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1342        self.stream.poll_write_chunk(cx, self.chunk, self.fin)
1343    }
1344}
1345
1346pub struct WriteChunks<'a> {
1347    stream: &'a StreamInstance,
1348    chunks: &'a [Bytes],
1349    fin: bool,
1350}
1351
1352impl Future for WriteChunks<'_> {
1353    type Output = Result<usize, WriteError>;
1354
1355    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1356        self.stream.poll_write_chunks(cx, self.chunks, self.fin)
1357    }
1358}
1359
1360#[cfg(feature = "tokio")]
1361impl tokio::io::AsyncRead for Stream {
1362    fn poll_read(
1363        self: Pin<&mut Self>,
1364        cx: &mut Context<'_>,
1365        buf: &mut tokio::io::ReadBuf<'_>,
1366    ) -> Poll<std::io::Result<()>> {
1367        let len = ready!(Self::poll_read(self.get_mut(), cx, buf.initialized_mut()))?;
1368        buf.set_filled(len);
1369        Poll::Ready(Ok(()))
1370    }
1371}
1372
1373#[cfg(feature = "tokio")]
1374impl tokio::io::AsyncWrite for Stream {
1375    fn poll_write(
1376        self: Pin<&mut Self>,
1377        cx: &mut Context<'_>,
1378        buf: &[u8],
1379    ) -> Poll<std::io::Result<usize>> {
1380        let len = ready!(Self::poll_write(self.get_mut(), cx, buf, false))?;
1381        Poll::Ready(Ok(len))
1382    }
1383
1384    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<std::io::Result<()>> {
1385        Poll::Ready(Ok(()))
1386    }
1387
1388    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<std::io::Result<()>> {
1389        ready!(Self::poll_finish_write(self.get_mut(), cx))?;
1390        Poll::Ready(Ok(()))
1391    }
1392}
1393
1394#[cfg(feature = "tokio")]
1395impl tokio::io::AsyncRead for ReadStream {
1396    fn poll_read(
1397        self: Pin<&mut Self>,
1398        cx: &mut Context<'_>,
1399        buf: &mut tokio::io::ReadBuf<'_>,
1400    ) -> Poll<std::io::Result<()>> {
1401        let len = ready!(Self::poll_read(self.get_mut(), cx, buf.initialized_mut()))?;
1402        buf.set_filled(len);
1403        Poll::Ready(Ok(()))
1404    }
1405}
1406
1407#[cfg(feature = "tokio")]
1408impl tokio::io::AsyncWrite for WriteStream {
1409    fn poll_write(
1410        self: Pin<&mut Self>,
1411        cx: &mut Context<'_>,
1412        buf: &[u8],
1413    ) -> Poll<std::io::Result<usize>> {
1414        let len = ready!(Self::poll_write(self.get_mut(), cx, buf, false))?;
1415        Poll::Ready(Ok(len))
1416    }
1417
1418    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<std::io::Result<()>> {
1419        Poll::Ready(Ok(()))
1420    }
1421
1422    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<std::io::Result<()>> {
1423        ready!(Self::poll_finish_write(self.get_mut(), cx))?;
1424        Poll::Ready(Ok(()))
1425    }
1426}
1427
1428impl futures_io::AsyncRead for Stream {
1429    fn poll_read(
1430        self: Pin<&mut Self>,
1431        cx: &mut Context<'_>,
1432        buf: &mut [u8],
1433    ) -> Poll<std::io::Result<usize>> {
1434        let len = ready!(Self::poll_read(self.get_mut(), cx, buf))?;
1435        Poll::Ready(Ok(len))
1436    }
1437}
1438
1439impl futures_io::AsyncWrite for Stream {
1440    fn poll_write(
1441        self: Pin<&mut Self>,
1442        cx: &mut Context<'_>,
1443        buf: &[u8],
1444    ) -> Poll<std::io::Result<usize>> {
1445        let len = ready!(Self::poll_write(self.get_mut(), cx, buf, false))?;
1446        Poll::Ready(Ok(len))
1447    }
1448
1449    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
1450        Poll::Ready(Ok(()))
1451    }
1452
1453    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
1454        ready!(Self::poll_finish_write(self.get_mut(), cx))?;
1455        Poll::Ready(Ok(()))
1456    }
1457}
1458
1459impl futures_io::AsyncRead for ReadStream {
1460    fn poll_read(
1461        self: Pin<&mut Self>,
1462        cx: &mut Context<'_>,
1463        buf: &mut [u8],
1464    ) -> Poll<std::io::Result<usize>> {
1465        let len = ready!(Self::poll_read(self.get_mut(), cx, buf))?;
1466        Poll::Ready(Ok(len))
1467    }
1468}
1469
1470impl futures_io::AsyncWrite for WriteStream {
1471    fn poll_write(
1472        self: Pin<&mut Self>,
1473        cx: &mut Context<'_>,
1474        buf: &[u8],
1475    ) -> Poll<std::io::Result<usize>> {
1476        let len = ready!(Self::poll_write(self.get_mut(), cx, buf, false))?;
1477        Poll::Ready(Ok(len))
1478    }
1479
1480    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
1481        Poll::Ready(Ok(()))
1482    }
1483
1484    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
1485        ready!(Self::poll_finish_write(self.get_mut(), cx))?;
1486        Poll::Ready(Ok(()))
1487    }
1488}
1489
1490enum ReadStatus<T> {
1491    Readable(T),
1492    Finished(Option<T>),
1493    Blocked(Option<T>),
1494}
1495
1496impl<T> From<(Option<T>, bool)> for ReadStatus<T> {
1497    fn from(status: (Option<T>, bool)) -> Self {
1498        match status {
1499            (read, true) => Self::Finished(read),
1500            (read, false) => Self::Blocked(read),
1501        }
1502    }
1503}
1504
1505enum WriteStatus<T> {
1506    Writable(T),
1507    Finished(Option<T>),
1508    Blocked(Option<T>),
1509}
1510
1511impl<T> From<(Option<T>, bool)> for WriteStatus<T> {
1512    fn from(status: (Option<T>, bool)) -> Self {
1513        match status {
1514            (write, true) => Self::Finished(write),
1515            (write, false) => Self::Blocked(write),
1516        }
1517    }
1518}
1519
1520#[derive(Debug, Error, Clone)]
1521pub enum StartError {
1522    #[error("connection not started yet")]
1523    ConnectionNotStarted,
1524    #[error("reach stream count limit")]
1525    LimitReached,
1526    #[error("connection lost")]
1527    ConnectionLost(#[from] ConnectionError),
1528    #[error("other error: status {0:?}")]
1529    OtherError(msquic::Status),
1530}
1531
1532#[derive(Debug, Error, Clone)]
1533pub enum ReadError {
1534    #[error("stream not opened for reading")]
1535    Closed,
1536    #[error("stream reset by peer: error {0}")]
1537    Reset(u64),
1538    #[error("connection lost")]
1539    ConnectionLost(#[from] ConnectionError),
1540    #[error("other error: status {0:?}")]
1541    OtherError(msquic::Status),
1542}
1543
1544impl From<ReadError> for std::io::Error {
1545    fn from(e: ReadError) -> Self {
1546        let kind = match e {
1547            ReadError::Closed => std::io::ErrorKind::NotConnected,
1548            ReadError::Reset(_) => std::io::ErrorKind::ConnectionReset,
1549            ReadError::ConnectionLost(ConnectionError::ConnectionClosed) => {
1550                std::io::ErrorKind::NotConnected
1551            }
1552            ReadError::ConnectionLost(_) => std::io::ErrorKind::ConnectionAborted,
1553            ReadError::OtherError(_) => std::io::ErrorKind::Other,
1554        };
1555        Self::new(kind, e)
1556    }
1557}
1558
1559#[derive(Debug, Error, Clone)]
1560pub enum WriteError {
1561    #[error("stream not opened for writing")]
1562    Closed,
1563    #[error("stream finished")]
1564    Finished,
1565    #[error("stream stopped by peer: error {0}")]
1566    Stopped(u64),
1567    #[error("connection lost")]
1568    ConnectionLost(#[from] ConnectionError),
1569    #[error("other error: status {0:?}")]
1570    OtherError(msquic::Status),
1571}
1572
1573impl From<WriteError> for std::io::Error {
1574    fn from(e: WriteError) -> Self {
1575        let kind = match e {
1576            WriteError::Closed
1577            | WriteError::Finished
1578            | WriteError::ConnectionLost(ConnectionError::ConnectionClosed) => {
1579                std::io::ErrorKind::NotConnected
1580            }
1581            WriteError::Stopped(_) => std::io::ErrorKind::ConnectionReset,
1582            WriteError::ConnectionLost(_) => std::io::ErrorKind::ConnectionAborted,
1583            WriteError::OtherError(_) => std::io::ErrorKind::Other,
1584        };
1585        Self::new(kind, e)
1586    }
1587}