msquic_async/
stream.rs

1use crate::buffer::{StreamRecvBuffer, WriteBuffer};
2use crate::connection::ConnectionError;
3
4use std::collections::VecDeque;
5use std::fmt;
6use std::future::Future;
7use std::ops::Deref;
8use std::pin::Pin;
9use std::sync::{Arc, Mutex, RwLock};
10use std::task::{ready, Context, Poll, Waker};
11
12use bytes::Bytes;
13use libc::c_void;
14use rangemap::RangeSet;
15use thiserror::Error;
16use tracing::trace;
17
18#[derive(Debug, Clone, Copy, PartialEq)]
19pub enum StreamType {
20    Bidirectional,
21    Unidirectional,
22}
23
24/// A stream represents a bidirectional or unidirectional stream.
25#[derive(Debug)]
26pub struct Stream(Arc<StreamInstance>);
27
28impl Stream {
29    pub(crate) fn open(
30        msquic_conn: &msquic::Connection,
31        stream_type: StreamType,
32    ) -> Result<Self, StartError> {
33        let msquic_stream = msquic::Stream::new();
34        let flags = if stream_type == StreamType::Unidirectional {
35            msquic::STREAM_OPEN_FLAG_UNIDIRECTIONAL
36        } else {
37            msquic::STREAM_OPEN_FLAG_NONE
38        };
39        let inner = Arc::new(StreamInner::new(
40            msquic_stream,
41            stream_type,
42            StreamSendState::Closed,
43            StreamRecvState::Closed,
44            true,
45        ));
46        inner
47            .shared
48            .msquic_stream
49            .open(
50                msquic_conn,
51                flags,
52                StreamInner::native_callback,
53                Arc::into_raw(inner.clone()) as *const c_void,
54            )
55            .map_err(StartError::OtherError)?;
56        trace!("Stream({:p}) Open by local", &*inner);
57
58        Ok(Self(Arc::new(StreamInstance(inner))))
59    }
60
61    pub(crate) fn from_handle(handle: msquic::Handle, stream_type: StreamType) -> Self {
62        let msquic_stream = msquic::Stream::from_parts(handle);
63        let send_state = if stream_type == StreamType::Bidirectional {
64            StreamSendState::StartComplete
65        } else {
66            StreamSendState::Closed
67        };
68        let inner = Arc::new(StreamInner::new(
69            msquic_stream,
70            stream_type,
71            send_state,
72            StreamRecvState::StartComplete,
73            false,
74        ));
75        inner.shared.msquic_stream.set_callback_handler(
76            StreamInner::native_callback,
77            Arc::into_raw(inner.clone()) as *const c_void,
78        );
79        let stream = Self(Arc::new(StreamInstance(inner)));
80        trace!(
81            "Stream({:p}, id={:?}) Start by peer",
82            &*stream.0 .0,
83            stream.id()
84        );
85        stream
86    }
87
88    pub(crate) fn poll_start(
89        &mut self,
90        cx: &mut Context,
91        failed_on_block: bool,
92    ) -> Poll<Result<(), StartError>> {
93        let mut exclusive = self.0.exclusive.lock().unwrap();
94        match exclusive.state {
95            StreamState::Open => {
96                self.0
97                    .shared
98                    .msquic_stream
99                    .start(
100                        msquic::STREAM_START_FLAG_SHUTDOWN_ON_FAIL
101                            | msquic::STREAM_START_FLAG_INDICATE_PEER_ACCEPT
102                            | if failed_on_block {
103                                msquic::STREAM_START_FLAG_FAIL_BLOCKED
104                            } else {
105                                msquic::STREAM_START_FLAG_NONE
106                            },
107                    )
108                    .map_err(StartError::OtherError)?;
109                exclusive.state = StreamState::Start;
110                if self.0.shared.stream_type == StreamType::Bidirectional {
111                    exclusive.recv_state = StreamRecvState::Start;
112                }
113                exclusive.send_state = StreamSendState::Start;
114            }
115            StreamState::Start => {}
116            _ => {
117                if let Some(start_status) = exclusive.start_status {
118                    if msquic::Status::succeeded(start_status) {
119                        return Poll::Ready(Ok(()));
120                    }
121                    return Poll::Ready(Err(match start_status {
122                        msquic::QUIC_STATUS_STREAM_LIMIT_REACHED => StartError::LimitReached,
123                        msquic::QUIC_STATUS_ABORTED | msquic::QUIC_STATUS_INVALID_STATE => {
124                            StartError::ConnectionLost(
125                                exclusive.conn_error.as_ref().expect("conn_error").clone(),
126                            )
127                        }
128                        _ => StartError::OtherError(start_status),
129                    }));
130                } else {
131                    return Poll::Ready(Ok(()));
132                }
133            }
134        }
135        exclusive.start_waiters.push(cx.waker().clone());
136        Poll::Pending
137    }
138
139    /// Returns the stream ID.
140    pub fn id(&self) -> Option<u64> {
141        self.0.id()
142    }
143
144    /// Splits the stream into a read stream and a write stream.
145    pub fn split(self) -> (Option<ReadStream>, Option<WriteStream>) {
146        match (self.0.shared.stream_type, self.0.shared.local_open) {
147            (StreamType::Unidirectional, true) => (None, Some(WriteStream(self.0))),
148            (StreamType::Unidirectional, false) => (Some(ReadStream(self.0)), None),
149            (StreamType::Bidirectional, _) => {
150                (Some(ReadStream(self.0.clone())), Some(WriteStream(self.0)))
151            }
152        }
153    }
154
155    /// Poll to read from the stream into buf.
156    pub fn poll_read(
157        &mut self,
158        cx: &mut Context<'_>,
159        buf: &mut [u8],
160    ) -> Poll<Result<usize, ReadError>> {
161        self.0.poll_read(cx, buf)
162    }
163
164    /// Poll to read the next segment of data.
165    pub fn poll_read_chunk(
166        &self,
167        cx: &mut Context<'_>,
168    ) -> Poll<Result<Option<StreamRecvBuffer>, ReadError>> {
169        self.0.poll_read_chunk(cx)
170    }
171
172    /// Read the next segment of data.
173    pub fn read_chunk(&self) -> ReadChunk<'_> {
174        self.0.read_chunk()
175    }
176
177    /// Poll to write to the stream from buf.
178    pub fn poll_write(
179        &mut self,
180        cx: &mut Context<'_>,
181        buf: &[u8],
182        fin: bool,
183    ) -> Poll<Result<usize, WriteError>> {
184        self.0.poll_write(cx, buf, fin)
185    }
186
187    /// Poll to write a bytes to the stream directly.
188    pub fn poll_write_chunk(
189        &mut self,
190        cx: &mut Context<'_>,
191        chunk: &Bytes,
192        fin: bool,
193    ) -> Poll<Result<usize, WriteError>> {
194        self.0.poll_write_chunk(cx, chunk, fin)
195    }
196
197    /// Write a bytes to the stream directly.
198    pub fn write_chunk<'a>(&'a mut self, chunk: &'a Bytes, fin: bool) -> WriteChunk<'a> {
199        self.0.write_chunk(chunk, fin)
200    }
201
202    /// Poll to write the list of bytes to the stream directly.
203    pub fn poll_write_chunks(
204        &mut self,
205        cx: &mut Context<'_>,
206        chunks: &[Bytes],
207        fin: bool,
208    ) -> Poll<Result<usize, WriteError>> {
209        self.0.poll_write_chunks(cx, chunks, fin)
210    }
211
212    /// Write the list of bytes to the stream directly.
213    pub fn write_chunks<'a>(&'a mut self, chunks: &'a [Bytes], fin: bool) -> WriteChunks<'a> {
214        self.0.write_chunks(chunks, fin)
215    }
216
217    /// Poll to finish writing to the stream.
218    pub fn poll_finish_write(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), WriteError>> {
219        self.0.poll_finish_write(cx)
220    }
221
222    /// Poll to abort writing to the stream.
223    pub fn poll_abort_write(
224        &mut self,
225        cx: &mut Context<'_>,
226        error_code: u64,
227    ) -> Poll<Result<(), WriteError>> {
228        self.0.poll_abort_write(cx, error_code)
229    }
230
231    /// Abort writing to the stream.
232    pub fn abort_write(&mut self, error_code: u64) -> Result<(), WriteError> {
233        self.0.abort_write(error_code)
234    }
235
236    /// Poll to abort reading from the stream.
237    pub fn poll_abort_read(
238        &mut self,
239        cx: &mut Context<'_>,
240        error_code: u64,
241    ) -> Poll<Result<(), ReadError>> {
242        self.0.poll_abort_read(cx, error_code)
243    }
244
245    /// Abort reading from the stream.
246    pub fn abort_read(&mut self, error_code: u64) -> Result<(), ReadError> {
247        self.0.abort_read(error_code)
248    }
249}
250
251/// A stream that can only be read from.
252#[derive(Debug)]
253pub struct ReadStream(Arc<StreamInstance>);
254
255impl ReadStream {
256    /// Returns the stream ID.
257    pub fn id(&self) -> Option<u64> {
258        self.0.id()
259    }
260
261    /// Poll to read from the stream into buf.
262    pub fn poll_read(
263        &mut self,
264        cx: &mut Context<'_>,
265        buf: &mut [u8],
266    ) -> Poll<Result<usize, ReadError>> {
267        self.0.poll_read(cx, buf)
268    }
269
270    /// Poll to read the next segment of data.
271    pub fn poll_read_chunk(
272        &self,
273        cx: &mut Context<'_>,
274    ) -> Poll<Result<Option<StreamRecvBuffer>, ReadError>> {
275        self.0.poll_read_chunk(cx)
276    }
277
278    /// Read the next segment of data.
279    pub fn read_chunk(&self) -> ReadChunk<'_> {
280        self.0.read_chunk()
281    }
282
283    /// Poll to abort reading from the stream.
284    pub fn poll_abort_read(
285        &mut self,
286        cx: &mut Context<'_>,
287        error_code: u64,
288    ) -> Poll<Result<(), ReadError>> {
289        self.0.poll_abort_read(cx, error_code)
290    }
291
292    /// Abort reading from the stream.
293    pub fn abort_read(&mut self, error_code: u64) -> Result<(), ReadError> {
294        self.0.abort_read(error_code)
295    }
296}
297
298/// A stream that can only be written to.
299#[derive(Debug)]
300pub struct WriteStream(Arc<StreamInstance>);
301
302impl WriteStream {
303    /// Returns the stream ID.
304    pub fn id(&self) -> Option<u64> {
305        self.0.id()
306    }
307
308    /// Poll to write to the stream from buf.
309    pub fn poll_write(
310        &mut self,
311        cx: &mut Context<'_>,
312        buf: &[u8],
313        fin: bool,
314    ) -> Poll<Result<usize, WriteError>> {
315        self.0.poll_write(cx, buf, fin)
316    }
317
318    /// Poll to write a bytes to the stream directly.
319    pub fn poll_write_chunk(
320        &mut self,
321        cx: &mut Context<'_>,
322        chunk: &Bytes,
323        fin: bool,
324    ) -> Poll<Result<usize, WriteError>> {
325        self.0.poll_write_chunk(cx, chunk, fin)
326    }
327
328    /// Write a bytes to the stream directly.
329    pub fn write_chunk<'a>(&'a mut self, chunk: &'a Bytes, fin: bool) -> WriteChunk<'a> {
330        self.0.write_chunk(chunk, fin)
331    }
332
333    /// Poll to write the list of bytes to the stream directly.
334    pub fn poll_write_chunks(
335        &mut self,
336        cx: &mut Context<'_>,
337        chunks: &[Bytes],
338        fin: bool,
339    ) -> Poll<Result<usize, WriteError>> {
340        self.0.poll_write_chunks(cx, chunks, fin)
341    }
342
343    /// Write the list of bytes to the stream directly.
344    pub fn write_chunks<'a>(&'a mut self, chunks: &'a [Bytes], fin: bool) -> WriteChunks<'a> {
345        self.0.write_chunks(chunks, fin)
346    }
347
348    /// Poll to finish writing to the stream.
349    pub fn poll_finish_write(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), WriteError>> {
350        self.0.poll_finish_write(cx)
351    }
352
353    /// Poll to abort writing to the stream.
354    pub fn poll_abort_write(
355        &mut self,
356        cx: &mut Context<'_>,
357        error_code: u64,
358    ) -> Poll<Result<(), WriteError>> {
359        self.0.poll_abort_write(cx, error_code)
360    }
361
362    /// Abort writing to the stream.
363    pub fn abort_write(&mut self, error_code: u64) -> Result<(), WriteError> {
364        self.0.abort_write(error_code)
365    }
366}
367
368impl StreamInstance {
369    pub(crate) fn id(&self) -> Option<u64> {
370        let id = { *self.0.shared.id.read().unwrap() };
371        if id.is_some() {
372            id
373        } else {
374            let id = 0u64;
375            let mut buffer_length = std::mem::size_of_val(&id) as u32;
376            let res = self.0.shared.msquic_stream.get_param(
377                msquic::PARAM_STREAM_ID,
378                &mut buffer_length as *mut _,
379                &id as *const _ as *const c_void,
380            );
381            if res.is_ok() {
382                self.0.shared.id.write().unwrap().replace(id);
383                Some(id)
384            } else {
385                None
386            }
387        }
388    }
389
390    pub(crate) fn poll_read(
391        &self,
392        cx: &mut Context<'_>,
393        buf: &mut [u8],
394    ) -> Poll<Result<usize, ReadError>> {
395        self.poll_read_generic(cx, |recv_buffers, read_complete_buffers| {
396            let mut read = 0;
397            let mut fin = false;
398            loop {
399                if read == buf.len() {
400                    return ReadStatus::Readable(read);
401                }
402
403                match recv_buffers
404                    .front_mut()
405                    .and_then(|x| x.get_bytes_upto_size(buf.len() - read))
406                {
407                    Some(slice) => {
408                        let len = slice.len();
409                        buf[read..read + len].copy_from_slice(slice);
410                        read += len;
411                    }
412                    None => {
413                        if let Some(mut recv_buffer) = recv_buffers.pop_front() {
414                            recv_buffer.set_stream(self.0.clone());
415                            fin = recv_buffer.fin();
416                            read_complete_buffers.push(recv_buffer);
417                            continue;
418                        } else {
419                            return (if read > 0 { Some(read) } else { None }, fin).into();
420                        }
421                    }
422                }
423            }
424        })
425        .map(|res| res.map(|x| x.unwrap_or(0)))
426    }
427
428    fn poll_read_chunk(
429        &self,
430        cx: &mut Context<'_>,
431    ) -> Poll<Result<Option<StreamRecvBuffer>, ReadError>> {
432        self.poll_read_generic(cx, |recv_buffers, _| {
433            recv_buffers
434                .pop_front()
435                .map(|mut recv_buffer| {
436                    let fin = recv_buffer.fin();
437                    recv_buffer.set_stream(self.0.clone());
438                    (Some(recv_buffer), fin)
439                })
440                .unwrap_or((None, false))
441                .into()
442        })
443    }
444
445    fn read_chunk(&self) -> ReadChunk<'_> {
446        ReadChunk { stream: self }
447    }
448
449    fn poll_read_generic<T, U>(
450        &self,
451        cx: &mut Context<'_>,
452        mut read_fn: T,
453    ) -> Poll<Result<Option<U>, ReadError>>
454    where
455        T: FnMut(&mut VecDeque<StreamRecvBuffer>, &mut Vec<StreamRecvBuffer>) -> ReadStatus<U>,
456    {
457        let res;
458        let mut read_complete_buffers = Vec::new();
459        {
460            let mut exclusive = self.0.exclusive.lock().unwrap();
461            match exclusive.recv_state {
462                StreamRecvState::Closed => {
463                    return Poll::Ready(Err(ReadError::Closed));
464                }
465                StreamRecvState::Start => {
466                    exclusive.start_waiters.push(cx.waker().clone());
467                    return Poll::Pending;
468                }
469                StreamRecvState::StartComplete => {}
470                StreamRecvState::Shutdown => {
471                    return Poll::Ready(Ok(None));
472                }
473                StreamRecvState::ShutdownComplete => {
474                    if let Some(conn_error) = &exclusive.conn_error {
475                        return Poll::Ready(Err(ReadError::ConnectionLost(conn_error.clone())));
476                    } else if let Some(error_code) = &exclusive.recv_error_code {
477                        return Poll::Ready(Err(ReadError::Reset(*error_code)));
478                    } else {
479                        return Poll::Ready(Ok(None));
480                    }
481                }
482            }
483
484            let status = read_fn(&mut exclusive.recv_buffers, &mut read_complete_buffers);
485
486            res = match status {
487                ReadStatus::Readable(read) | ReadStatus::Blocked(Some(read)) => {
488                    Poll::Ready(Ok(Some(read)))
489                }
490                ReadStatus::Finished(read) => {
491                    exclusive.recv_state = StreamRecvState::Shutdown;
492                    Poll::Ready(Ok(read))
493                }
494                ReadStatus::Blocked(None) => {
495                    exclusive.read_waiters.push(cx.waker().clone());
496                    Poll::Pending
497                }
498            };
499        }
500        res
501    }
502
503    pub(crate) fn poll_write(
504        &self,
505        cx: &mut Context<'_>,
506        buf: &[u8],
507        fin: bool,
508    ) -> Poll<Result<usize, WriteError>> {
509        self.poll_write_generic(cx, |write_buf| {
510            let written = write_buf.put_slice(buf);
511            if written == buf.len() && !fin {
512                WriteStatus::Writable(written)
513            } else {
514                (Some(written), fin).into()
515            }
516        })
517        .map(|res| res.map(|x| x.unwrap_or(0)))
518    }
519
520    pub(crate) fn poll_write_chunk(
521        &self,
522        cx: &mut Context<'_>,
523        chunk: &Bytes,
524        fin: bool,
525    ) -> Poll<Result<usize, WriteError>> {
526        self.poll_write_generic(cx, |write_buf| {
527            let written = write_buf.put_zerocopy(chunk);
528            if written == chunk.len() && !fin {
529                WriteStatus::Writable(written)
530            } else {
531                (Some(written), fin).into()
532            }
533        })
534        .map(|res| res.map(|x| x.unwrap_or(0)))
535    }
536
537    pub(crate) fn write_chunk<'a>(&'a self, chunk: &'a Bytes, fin: bool) -> WriteChunk<'a> {
538        WriteChunk {
539            stream: self,
540            chunk,
541            fin,
542        }
543    }
544
545    fn poll_write_chunks(
546        &self,
547        cx: &mut Context<'_>,
548        chunks: &[Bytes],
549        fin: bool,
550    ) -> Poll<Result<usize, WriteError>> {
551        self.poll_write_generic(cx, |write_buf| {
552            let (mut total_len, mut total_written) = (0, 0);
553            for buf in chunks {
554                total_len += buf.len();
555                total_written += write_buf.put_zerocopy(buf);
556            }
557            if total_written == total_len && !fin {
558                WriteStatus::Writable(total_written)
559            } else {
560                (Some(total_written), fin).into()
561            }
562        })
563        .map(|res| res.map(|x| x.unwrap_or(0)))
564    }
565
566    pub(crate) fn write_chunks<'a>(&'a self, chunks: &'a [Bytes], fin: bool) -> WriteChunks<'a> {
567        WriteChunks {
568            stream: self,
569            chunks,
570            fin,
571        }
572    }
573
574    fn poll_write_generic<T, U>(
575        &self,
576        _cx: &mut Context<'_>,
577        mut write_fn: T,
578    ) -> Poll<Result<Option<U>, WriteError>>
579    where
580        T: FnMut(&mut WriteBuffer) -> WriteStatus<U>,
581    {
582        let mut exclusive = self.0.exclusive.lock().unwrap();
583        match exclusive.send_state {
584            StreamSendState::Closed => {
585                return Poll::Ready(Err(WriteError::Closed));
586            }
587            StreamSendState::Start => {
588                exclusive.start_waiters.push(_cx.waker().clone());
589                return Poll::Pending;
590            }
591            StreamSendState::StartComplete => {}
592            StreamSendState::Shutdown => {
593                return Poll::Ready(Err(WriteError::Finished));
594            }
595            StreamSendState::ShutdownComplete => {
596                if let Some(conn_error) = &exclusive.conn_error {
597                    return Poll::Ready(Err(WriteError::ConnectionLost(conn_error.clone())));
598                } else if let Some(error_code) = &exclusive.send_error_code {
599                    return Poll::Ready(Err(WriteError::Stopped(*error_code)));
600                } else {
601                    return Poll::Ready(Err(WriteError::Finished));
602                }
603            }
604        }
605        let mut write_buf = exclusive.write_pool.pop().unwrap_or(WriteBuffer::new());
606        let status = write_fn(&mut write_buf);
607        let (buffer, buffer_count) = write_buf.get_buffer();
608        match status {
609            WriteStatus::Writable(val) | WriteStatus::Blocked(Some(val)) => {
610                match self
611                    .0
612                    .shared
613                    .msquic_stream
614                    .send(
615                        unsafe { &*buffer },
616                        buffer_count,
617                        msquic::SEND_FLAG_NONE,
618                        write_buf.into_raw() as *const _,
619                    )
620                    .map_err(WriteError::OtherError)
621                {
622                    Ok(()) => Poll::Ready(Ok(Some(val))),
623                    Err(e) => Poll::Ready(Err(e)),
624                }
625            }
626            WriteStatus::Blocked(None) => unreachable!(),
627            WriteStatus::Finished(val) => {
628                match self
629                    .0
630                    .shared
631                    .msquic_stream
632                    .send(
633                        unsafe { &*buffer },
634                        buffer_count,
635                        msquic::SEND_FLAG_FIN,
636                        write_buf.into_raw() as *const _,
637                    )
638                    .map_err(WriteError::OtherError)
639                {
640                    Ok(()) => {
641                        exclusive.send_state = StreamSendState::Shutdown;
642                        Poll::Ready(Ok(val))
643                    }
644                    Err(e) => Poll::Ready(Err(e)),
645                }
646            }
647        }
648    }
649
650    pub(crate) fn poll_finish_write(&self, cx: &mut Context<'_>) -> Poll<Result<(), WriteError>> {
651        let mut exclusive = self.0.exclusive.lock().unwrap();
652        match exclusive.send_state {
653            StreamSendState::Start => {
654                exclusive.start_waiters.push(cx.waker().clone());
655                return Poll::Pending;
656            }
657            StreamSendState::StartComplete => {
658                match self
659                    .0
660                    .shared
661                    .msquic_stream
662                    .shutdown(msquic::STREAM_SHUTDOWN_FLAG_GRACEFUL, 0)
663                    .map_err(WriteError::OtherError)
664                {
665                    Ok(()) => {
666                        exclusive.send_state = StreamSendState::Shutdown;
667                    }
668                    Err(e) => return Poll::Ready(Err(e)),
669                }
670            }
671            StreamSendState::Shutdown => {}
672            StreamSendState::ShutdownComplete => {
673                if let Some(conn_error) = &exclusive.conn_error {
674                    return Poll::Ready(Err(WriteError::ConnectionLost(conn_error.clone())));
675                } else if let Some(error_code) = &exclusive.send_error_code {
676                    return Poll::Ready(Err(WriteError::Stopped(*error_code)));
677                } else {
678                    return Poll::Ready(Ok(()));
679                }
680            }
681            _ => {
682                return Poll::Ready(Err(WriteError::Closed));
683            }
684        }
685        exclusive.write_shutdown_waiters.push(cx.waker().clone());
686        Poll::Pending
687    }
688
689    pub(crate) fn poll_abort_write(
690        &self,
691        cx: &mut Context<'_>,
692        error_code: u64,
693    ) -> Poll<Result<(), WriteError>> {
694        let mut exclusive = self.0.exclusive.lock().unwrap();
695        match exclusive.send_state {
696            StreamSendState::Start => {
697                exclusive.start_waiters.push(cx.waker().clone());
698                return Poll::Pending;
699            }
700            StreamSendState::StartComplete => {
701                match self
702                    .0
703                    .shared
704                    .msquic_stream
705                    .shutdown(msquic::STREAM_SHUTDOWN_FLAG_ABORT_SEND, error_code)
706                    .map_err(WriteError::OtherError)
707                {
708                    Ok(()) => {
709                        exclusive.send_state = StreamSendState::Shutdown;
710                    }
711                    Err(e) => return Poll::Ready(Err(e)),
712                }
713            }
714            StreamSendState::Shutdown => {}
715            StreamSendState::ShutdownComplete => {
716                if let Some(conn_error) = &exclusive.conn_error {
717                    return Poll::Ready(Err(WriteError::ConnectionLost(conn_error.clone())));
718                } else if let Some(error_code) = &exclusive.send_error_code {
719                    return Poll::Ready(Err(WriteError::Stopped(*error_code)));
720                } else {
721                    return Poll::Ready(Ok(()));
722                }
723            }
724            _ => {
725                return Poll::Ready(Err(WriteError::Closed));
726            }
727        }
728        exclusive.write_shutdown_waiters.push(cx.waker().clone());
729        Poll::Pending
730    }
731
732    pub(crate) fn abort_write(&self, error_code: u64) -> Result<(), WriteError> {
733        let mut exclusive = self.0.exclusive.lock().unwrap();
734        match exclusive.send_state {
735            StreamSendState::StartComplete => {
736                self.0
737                    .shared
738                    .msquic_stream
739                    .shutdown(msquic::STREAM_SHUTDOWN_FLAG_ABORT_SEND, error_code)
740                    .map_err(WriteError::OtherError)?;
741                exclusive.send_state = StreamSendState::Shutdown;
742                Ok(())
743            }
744            _ => Err(WriteError::Closed),
745        }
746    }
747
748    pub(crate) fn poll_abort_read(
749        &self,
750        cx: &mut Context<'_>,
751        error_code: u64,
752    ) -> Poll<Result<(), ReadError>> {
753        let mut exclusive = self.0.exclusive.lock().unwrap();
754        match exclusive.recv_state {
755            StreamRecvState::Start => {
756                exclusive.start_waiters.push(cx.waker().clone());
757                Poll::Pending
758            }
759            StreamRecvState::StartComplete => {
760                match self
761                    .0
762                    .shared
763                    .msquic_stream
764                    .shutdown(msquic::STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE, error_code)
765                    .map_err(ReadError::OtherError)
766                {
767                    Ok(()) => {
768                        exclusive.recv_state = StreamRecvState::ShutdownComplete;
769                        exclusive
770                            .read_waiters
771                            .drain(..)
772                            .for_each(|waker| waker.wake());
773                        Poll::Ready(Ok(()))
774                    }
775                    Err(e) => Poll::Ready(Err(e)),
776                }
777            }
778            StreamRecvState::ShutdownComplete => {
779                if let Some(conn_error) = &exclusive.conn_error {
780                    Poll::Ready(Err(ReadError::ConnectionLost(conn_error.clone())))
781                } else if let Some(error_code) = &exclusive.recv_error_code {
782                    Poll::Ready(Err(ReadError::Reset(*error_code)))
783                } else {
784                    Poll::Ready(Ok(()))
785                }
786            }
787            _ => Poll::Ready(Err(ReadError::Closed)),
788        }
789    }
790
791    pub(crate) fn abort_read(&self, error_code: u64) -> Result<(), ReadError> {
792        let mut exclusive = self.0.exclusive.lock().unwrap();
793        match exclusive.recv_state {
794            StreamRecvState::StartComplete => {
795                self.0
796                    .shared
797                    .msquic_stream
798                    .shutdown(msquic::STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE, error_code)
799                    .map_err(ReadError::OtherError)?;
800                exclusive.recv_state = StreamRecvState::ShutdownComplete;
801            }
802            _ => {
803                return Err(ReadError::Closed);
804            }
805        }
806        Ok(())
807    }
808}
809#[derive(Clone, Debug)]
810struct StreamInstance(Arc<StreamInner>);
811
812impl Drop for StreamInstance {
813    fn drop(&mut self) {
814        trace!("StreamInstance({:p}) dropping", &*self.0);
815        let mut exclusive = self.0.exclusive.lock().unwrap();
816        exclusive.recv_buffers.clear();
817        match exclusive.state {
818            StreamState::Start | StreamState::StartComplete => {
819                trace!("StreamInstance({:p}) shutdown while dropping", &*self.0);
820                let _ = self.0.shared.msquic_stream.shutdown(
821                    msquic::STREAM_SHUTDOWN_FLAG_ABORT_SEND
822                        | msquic::STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE
823                        | msquic::STREAM_SHUTDOWN_FLAG_IMMEDIATE,
824                    0,
825                );
826            }
827            _ => {}
828        }
829    }
830}
831
832impl Deref for StreamInstance {
833    type Target = StreamInner;
834
835    fn deref(&self) -> &Self::Target {
836        &self.0
837    }
838}
839
840#[derive(Debug)]
841pub(crate) struct StreamInner {
842    exclusive: Mutex<StreamInnerExclusive>,
843    pub(crate) shared: StreamInnerShared,
844}
845
846struct StreamInnerExclusive {
847    state: StreamState,
848    start_status: Option<u32>,
849    recv_state: StreamRecvState,
850    recv_buffers: VecDeque<StreamRecvBuffer>,
851    read_complete_map: RangeSet<usize>,
852    read_complete_cursor: usize,
853    send_state: StreamSendState,
854    write_pool: Vec<WriteBuffer>,
855    recv_error_code: Option<u64>,
856    send_error_code: Option<u64>,
857    conn_error: Option<ConnectionError>,
858    start_waiters: Vec<Waker>,
859    read_waiters: Vec<Waker>,
860    write_shutdown_waiters: Vec<Waker>,
861}
862
863pub(crate) struct StreamInnerShared {
864    stream_type: StreamType,
865    local_open: bool,
866    id: RwLock<Option<u64>>,
867    pub(crate) msquic_stream: msquic::Stream,
868}
869
870#[derive(Debug, PartialEq)]
871enum StreamState {
872    Open,
873    Start,
874    StartComplete,
875    ShutdownComplete,
876}
877
878#[derive(Debug, PartialEq)]
879enum StreamRecvState {
880    Closed,
881    Start,
882    StartComplete,
883    Shutdown,
884    ShutdownComplete,
885}
886
887#[derive(Debug, PartialEq)]
888enum StreamSendState {
889    Closed,
890    Start,
891    StartComplete,
892    Shutdown,
893    ShutdownComplete,
894}
895
896impl StreamInner {
897    fn new(
898        msquic_stream: msquic::Stream,
899        stream_type: StreamType,
900        send_state: StreamSendState,
901        recv_state: StreamRecvState,
902        local_open: bool,
903    ) -> Self {
904        Self {
905            exclusive: Mutex::new(StreamInnerExclusive {
906                state: StreamState::Open,
907                start_status: None,
908                recv_state,
909                recv_buffers: VecDeque::new(),
910                read_complete_map: RangeSet::new(),
911                read_complete_cursor: 0,
912                send_state,
913                write_pool: Vec::new(),
914                recv_error_code: None,
915                send_error_code: None,
916                conn_error: None,
917                start_waiters: Vec::new(),
918                read_waiters: Vec::new(),
919                write_shutdown_waiters: Vec::new(),
920            }),
921            shared: StreamInnerShared {
922                msquic_stream,
923                local_open,
924                id: RwLock::new(None),
925                stream_type,
926            },
927        }
928    }
929
930    pub(crate) fn read_complete(&self, buffer: &StreamRecvBuffer) {
931        let buffer_range = buffer.range();
932        trace!(
933            "StreamInner({:p}) read complete offset={} len={}",
934            self,
935            buffer_range.start,
936            buffer_range.end - buffer_range.start
937        );
938
939        let complete_len = if !buffer_range.is_empty() {
940            let mut exclusive = self.exclusive.lock().unwrap();
941            exclusive.read_complete_map.insert(buffer_range);
942            let complete_range = exclusive.read_complete_map.first().unwrap();
943            trace!(
944                "StreamInner({:p}) complete read offset={} len={}",
945                self,
946                complete_range.start,
947                complete_range.end - complete_range.start
948            );
949            if complete_range.start == 0 && exclusive.read_complete_cursor < complete_range.end {
950                let complete_len = complete_range.end - exclusive.read_complete_cursor;
951                exclusive.read_complete_cursor = complete_range.end;
952                complete_len
953            } else {
954                0
955            }
956        } else {
957            0
958        };
959        if complete_len > 0 {
960            trace!(
961                "StreamInner({:p}) call receive_complete len={}",
962                self,
963                complete_len
964            );
965            let _ = self
966                .shared
967                .msquic_stream
968                .receive_complete(complete_len as u64);
969        }
970    }
971
972    fn handle_event_start_complete(
973        _stream: msquic::Handle,
974        inner: &Self,
975        payload: &msquic::StreamEventStartComplete,
976    ) -> u32 {
977        if msquic::Status::succeeded(payload.status) {
978            inner.shared.id.write().unwrap().replace(payload.id);
979        }
980        trace!(
981            "Stream({:p}, id={:?}) start complete status=0x{:x}, peer_accepted={}, id={}",
982            inner,
983            inner.shared.id.read(),
984            payload.status,
985            payload.bit_flags.peer_accepted(),
986            payload.id,
987        );
988        let mut exclusive = inner.exclusive.lock().unwrap();
989        exclusive.start_status = Some(payload.status);
990        if msquic::Status::succeeded(payload.status) && payload.bit_flags.peer_accepted() == 1 {
991            exclusive.state = StreamState::StartComplete;
992            if inner.shared.stream_type == StreamType::Bidirectional {
993                exclusive.recv_state = StreamRecvState::StartComplete;
994            }
995            exclusive.send_state = StreamSendState::StartComplete;
996        }
997
998        if payload.status == msquic::QUIC_STATUS_STREAM_LIMIT_REACHED
999            || payload.bit_flags.peer_accepted() == 1
1000        {
1001            exclusive
1002                .start_waiters
1003                .drain(..)
1004                .for_each(|waker| waker.wake());
1005        }
1006        msquic::QUIC_STATUS_SUCCESS
1007    }
1008
1009    fn handle_event_receive(
1010        _stream: msquic::Handle,
1011        inner: &Self,
1012        payload: &msquic::StreamEventReceive,
1013    ) -> u32 {
1014        trace!(
1015            "Stream({:p}, id={:?}) Receive {} offsets {} bytes, fin {}",
1016            inner,
1017            inner.shared.id.read(),
1018            payload.absolute_offset,
1019            payload.total_buffer_length,
1020            (payload.flags & msquic::RECEIVE_FLAG_FIN) == msquic::RECEIVE_FLAG_FIN
1021        );
1022
1023        let buffers =
1024            unsafe { std::slice::from_raw_parts(payload.buffer, payload.buffer_count as usize) };
1025
1026        let arc_inner: Arc<Self> = unsafe { Arc::from_raw(inner as *const _) };
1027
1028        let recv_buffer = StreamRecvBuffer::new(
1029            payload.absolute_offset as usize,
1030            &buffers,
1031            (payload.flags & msquic::RECEIVE_FLAG_FIN) == msquic::RECEIVE_FLAG_FIN,
1032        );
1033
1034        let _ = Arc::into_raw(arc_inner);
1035
1036        let mut exclusive = inner.exclusive.lock().unwrap();
1037        exclusive.recv_buffers.push_back(recv_buffer);
1038        exclusive
1039            .read_waiters
1040            .drain(..)
1041            .for_each(|waker| waker.wake());
1042        msquic::QUIC_STATUS_PENDING
1043    }
1044
1045    fn handle_event_send_complete(
1046        _stream: msquic::Handle,
1047        inner: &Self,
1048        payload: &msquic::StreamEventSendComplete,
1049    ) -> u32 {
1050        trace!(
1051            "Stream({:p}, id={:?}) Send complete",
1052            inner,
1053            inner.shared.id.read()
1054        );
1055
1056        let mut write_buf = unsafe { WriteBuffer::from_raw(payload.client_context) };
1057        let mut exclusive = inner.exclusive.lock().unwrap();
1058        write_buf.reset();
1059        exclusive.write_pool.push(write_buf);
1060        msquic::QUIC_STATUS_SUCCESS
1061    }
1062
1063    fn handle_event_peer_send_shutdown(_stream: msquic::Handle, inner: &Self) -> u32 {
1064        trace!(
1065            "Stream({:p}, id={:?}) Peer send shutdown",
1066            inner,
1067            inner.shared.id.read()
1068        );
1069        let mut exclusive = inner.exclusive.lock().unwrap();
1070        exclusive.recv_state = StreamRecvState::ShutdownComplete;
1071        exclusive
1072            .read_waiters
1073            .drain(..)
1074            .for_each(|waker| waker.wake());
1075        msquic::QUIC_STATUS_SUCCESS
1076    }
1077
1078    fn handle_event_peer_send_aborted(
1079        _stream: msquic::Handle,
1080        inner: &Self,
1081        payload: &msquic::StreamEventPeerSendAborted,
1082    ) -> u32 {
1083        trace!(
1084            "Stream({:p}, id={:?}) Peer send aborted",
1085            inner,
1086            inner.shared.id.read()
1087        );
1088        let mut exclusive = inner.exclusive.lock().unwrap();
1089        exclusive.recv_state = StreamRecvState::ShutdownComplete;
1090        exclusive.recv_error_code = Some(payload.error_code);
1091        exclusive
1092            .read_waiters
1093            .drain(..)
1094            .for_each(|waker| waker.wake());
1095        msquic::QUIC_STATUS_SUCCESS
1096    }
1097
1098    fn handle_event_peer_receive_aborted(
1099        _stream: msquic::Handle,
1100        inner: &Self,
1101        payload: &msquic::StreamEventPeerReceiveAborted,
1102    ) -> u32 {
1103        trace!(
1104            "Stream({:p}, id={:?}) Peer receive aborted",
1105            inner,
1106            inner.shared.id.read()
1107        );
1108        let mut exclusive = inner.exclusive.lock().unwrap();
1109        exclusive.send_state = StreamSendState::ShutdownComplete;
1110        exclusive.send_error_code = Some(payload.error_code);
1111        exclusive
1112            .write_shutdown_waiters
1113            .drain(..)
1114            .for_each(|waker| waker.wake());
1115        msquic::QUIC_STATUS_SUCCESS
1116    }
1117
1118    fn handle_event_send_shutdown_complete(
1119        _stream: msquic::Handle,
1120        inner: &Self,
1121        _payload: &msquic::StreamEventSendShutdownComplete,
1122    ) -> u32 {
1123        trace!(
1124            "Stream({:p}, id={:?}) Send shutdown complete",
1125            inner,
1126            inner.shared.id.read()
1127        );
1128        let mut exclusive = inner.exclusive.lock().unwrap();
1129        exclusive.send_state = StreamSendState::ShutdownComplete;
1130        exclusive
1131            .write_shutdown_waiters
1132            .drain(..)
1133            .for_each(|waker| waker.wake());
1134        msquic::QUIC_STATUS_SUCCESS
1135    }
1136
1137    fn handle_event_shutdown_complete(
1138        _stream: msquic::Handle,
1139        inner: &Self,
1140        payload: &msquic::StreamEventShutdownComplete,
1141    ) -> u32 {
1142        trace!(
1143            "Stream({:p}, id={:?}) Shutdown complete",
1144            inner,
1145            inner.shared.id.read()
1146        );
1147        {
1148            let mut exclusive = inner.exclusive.lock().unwrap();
1149            exclusive.state = StreamState::ShutdownComplete;
1150            exclusive.recv_state = StreamRecvState::ShutdownComplete;
1151            exclusive.send_state = StreamSendState::ShutdownComplete;
1152            if payload.connection_shutdown {
1153                match (
1154                    payload.bit_flags.conn_shutdown_by_app() == 1,
1155                    payload.bit_flags.conn_closed_remotely() == 1,
1156                ) {
1157                    (true, true) => {
1158                        exclusive.conn_error = Some(ConnectionError::ShutdownByPeer(
1159                            payload.connection_error_code,
1160                        ));
1161                    }
1162                    (true, false) => {
1163                        exclusive.conn_error = Some(ConnectionError::ShutdownByLocal);
1164                    }
1165                    (false, true) | (false, false) => {
1166                        exclusive.conn_error = Some(ConnectionError::ShutdownByTransport(
1167                            payload.connection_close_status,
1168                            payload.connection_error_code,
1169                        ));
1170                    }
1171                }
1172            }
1173            exclusive
1174                .start_waiters
1175                .drain(..)
1176                .for_each(|waker| waker.wake());
1177            exclusive
1178                .read_waiters
1179                .drain(..)
1180                .for_each(|waker| waker.wake());
1181        }
1182        unsafe {
1183            Arc::from_raw(inner as *const _);
1184        }
1185        msquic::QUIC_STATUS_SUCCESS
1186    }
1187
1188    fn handle_event_ideal_send_buffer_size(
1189        _stream: msquic::Handle,
1190        inner: &Self,
1191        _payload: &msquic::StreamEventIdealSendBufferSize,
1192    ) -> u32 {
1193        trace!(
1194            "Stream({:p}, id={:?}) Ideal send buffer size",
1195            inner,
1196            inner.shared.id.read()
1197        );
1198        msquic::QUIC_STATUS_SUCCESS
1199    }
1200
1201    fn handle_event_peer_accepted(_stream: msquic::Handle, inner: &Self) -> u32 {
1202        trace!(
1203            "Stream({:p}, id={:?}) Peer accepted",
1204            inner,
1205            inner.shared.id.read()
1206        );
1207        let mut exclusive = inner.exclusive.lock().unwrap();
1208        exclusive.state = StreamState::StartComplete;
1209        if inner.shared.stream_type == StreamType::Bidirectional {
1210            exclusive.recv_state = StreamRecvState::StartComplete;
1211        }
1212        exclusive.send_state = StreamSendState::StartComplete;
1213        exclusive
1214            .start_waiters
1215            .drain(..)
1216            .for_each(|waker| waker.wake());
1217        msquic::QUIC_STATUS_SUCCESS
1218    }
1219
1220    extern "C" fn native_callback(
1221        stream: msquic::Handle,
1222        context: *mut c_void,
1223        event: &msquic::StreamEvent,
1224    ) -> u32 {
1225        let inner = unsafe { &*(context as *const Self) };
1226
1227        match event.event_type {
1228            msquic::STREAM_EVENT_START_COMPLETE => {
1229                Self::handle_event_start_complete(stream, inner, unsafe {
1230                    &event.payload.start_complete
1231                })
1232            }
1233            msquic::STREAM_EVENT_RECEIVE => {
1234                Self::handle_event_receive(stream, inner, unsafe { &event.payload.receive })
1235            }
1236            msquic::STREAM_EVENT_SEND_COMPLETE => {
1237                Self::handle_event_send_complete(stream, inner, unsafe {
1238                    &event.payload.send_complete
1239                })
1240            }
1241            msquic::STREAM_EVENT_PEER_SEND_SHUTDOWN => {
1242                Self::handle_event_peer_send_shutdown(stream, inner)
1243            }
1244            msquic::STREAM_EVENT_PEER_SEND_ABORTED => {
1245                Self::handle_event_peer_send_aborted(stream, inner, unsafe {
1246                    &event.payload.peer_send_aborted
1247                })
1248            }
1249            msquic::STREAM_EVENT_PEER_RECEIVE_ABORTED => {
1250                Self::handle_event_peer_receive_aborted(stream, inner, unsafe {
1251                    &event.payload.peer_receive_aborted
1252                })
1253            }
1254            msquic::STREAM_EVENT_SEND_SHUTDOWN_COMPLETE => {
1255                Self::handle_event_send_shutdown_complete(stream, inner, unsafe {
1256                    &event.payload.send_shutdown_complete
1257                })
1258            }
1259            msquic::STREAM_EVENT_SHUTDOWN_COMPLETE => {
1260                Self::handle_event_shutdown_complete(stream, inner, unsafe {
1261                    &event.payload.shutdown_complete
1262                })
1263            }
1264            msquic::STREAM_EVENT_IDEAL_SEND_BUFFER_SIZE => {
1265                Self::handle_event_ideal_send_buffer_size(stream, inner, unsafe {
1266                    &event.payload.ideal_send_buffer_size
1267                })
1268            }
1269            msquic::STREAM_EVENT_PEER_ACCEPTED => Self::handle_event_peer_accepted(stream, inner),
1270            _ => {
1271                trace!("Stream({:p}) Other callback {}", inner, event.event_type);
1272                msquic::QUIC_STATUS_SUCCESS
1273            }
1274        }
1275    }
1276}
1277
1278impl Drop for StreamInner {
1279    fn drop(&mut self) {
1280        trace!("StreamInner({:p}) dropping", self);
1281    }
1282}
1283
1284impl fmt::Debug for StreamInnerExclusive {
1285    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1286        f.debug_struct("Exclusive")
1287            .field("state", &self.state)
1288            .field("recv_state", &self.recv_state)
1289            .field("send_state", &self.send_state)
1290            .finish()
1291    }
1292}
1293
1294impl fmt::Debug for StreamInnerShared {
1295    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1296        f.debug_struct("Shared")
1297            .field("type", &self.stream_type)
1298            .field("id", &self.id)
1299            .finish()
1300    }
1301}
1302pub struct ReadChunk<'a> {
1303    stream: &'a StreamInstance,
1304}
1305
1306impl Future for ReadChunk<'_> {
1307    type Output = Result<Option<StreamRecvBuffer>, ReadError>;
1308
1309    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1310        self.stream.poll_read_chunk(cx)
1311    }
1312}
1313
1314pub struct WriteChunk<'a> {
1315    stream: &'a StreamInstance,
1316    chunk: &'a Bytes,
1317    fin: bool,
1318}
1319
1320impl Future for WriteChunk<'_> {
1321    type Output = Result<usize, WriteError>;
1322
1323    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1324        self.stream.poll_write_chunk(cx, self.chunk, self.fin)
1325    }
1326}
1327
1328pub struct WriteChunks<'a> {
1329    stream: &'a StreamInstance,
1330    chunks: &'a [Bytes],
1331    fin: bool,
1332}
1333
1334impl Future for WriteChunks<'_> {
1335    type Output = Result<usize, WriteError>;
1336
1337    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1338        self.stream.poll_write_chunks(cx, self.chunks, self.fin)
1339    }
1340}
1341
1342#[cfg(feature = "tokio")]
1343impl tokio::io::AsyncRead for Stream {
1344    fn poll_read(
1345        self: Pin<&mut Self>,
1346        cx: &mut Context<'_>,
1347        buf: &mut tokio::io::ReadBuf<'_>,
1348    ) -> Poll<std::io::Result<()>> {
1349        let len = ready!(Self::poll_read(self.get_mut(), cx, buf.initialized_mut()))?;
1350        buf.set_filled(len);
1351        Poll::Ready(Ok(()))
1352    }
1353}
1354
1355#[cfg(feature = "tokio")]
1356impl tokio::io::AsyncWrite for Stream {
1357    fn poll_write(
1358        self: Pin<&mut Self>,
1359        cx: &mut Context<'_>,
1360        buf: &[u8],
1361    ) -> Poll<std::io::Result<usize>> {
1362        let len = ready!(Self::poll_write(self.get_mut(), cx, buf, false))?;
1363        Poll::Ready(Ok(len))
1364    }
1365
1366    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<std::io::Result<()>> {
1367        Poll::Ready(Ok(()))
1368    }
1369
1370    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<std::io::Result<()>> {
1371        ready!(Self::poll_finish_write(self.get_mut(), cx))?;
1372        Poll::Ready(Ok(()))
1373    }
1374}
1375
1376#[cfg(feature = "tokio")]
1377impl tokio::io::AsyncRead for ReadStream {
1378    fn poll_read(
1379        self: Pin<&mut Self>,
1380        cx: &mut Context<'_>,
1381        buf: &mut tokio::io::ReadBuf<'_>,
1382    ) -> Poll<std::io::Result<()>> {
1383        let len = ready!(Self::poll_read(self.get_mut(), cx, buf.initialized_mut()))?;
1384        buf.set_filled(len);
1385        Poll::Ready(Ok(()))
1386    }
1387}
1388
1389#[cfg(feature = "tokio")]
1390impl tokio::io::AsyncWrite for WriteStream {
1391    fn poll_write(
1392        self: Pin<&mut Self>,
1393        cx: &mut Context<'_>,
1394        buf: &[u8],
1395    ) -> Poll<std::io::Result<usize>> {
1396        let len = ready!(Self::poll_write(self.get_mut(), cx, buf, false))?;
1397        Poll::Ready(Ok(len))
1398    }
1399
1400    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<std::io::Result<()>> {
1401        Poll::Ready(Ok(()))
1402    }
1403
1404    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<std::io::Result<()>> {
1405        ready!(Self::poll_finish_write(self.get_mut(), cx))?;
1406        Poll::Ready(Ok(()))
1407    }
1408}
1409
1410impl futures_io::AsyncRead for Stream {
1411    fn poll_read(
1412        self: Pin<&mut Self>,
1413        cx: &mut Context<'_>,
1414        buf: &mut [u8],
1415    ) -> Poll<std::io::Result<usize>> {
1416        let len = ready!(Self::poll_read(self.get_mut(), cx, buf))?;
1417        Poll::Ready(Ok(len))
1418    }
1419}
1420
1421impl futures_io::AsyncWrite for Stream {
1422    fn poll_write(
1423        self: Pin<&mut Self>,
1424        cx: &mut Context<'_>,
1425        buf: &[u8],
1426    ) -> Poll<std::io::Result<usize>> {
1427        let len = ready!(Self::poll_write(self.get_mut(), cx, buf, false))?;
1428        Poll::Ready(Ok(len))
1429    }
1430
1431    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
1432        Poll::Ready(Ok(()))
1433    }
1434
1435    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
1436        ready!(Self::poll_finish_write(self.get_mut(), cx))?;
1437        Poll::Ready(Ok(()))
1438    }
1439}
1440
1441impl futures_io::AsyncRead for ReadStream {
1442    fn poll_read(
1443        self: Pin<&mut Self>,
1444        cx: &mut Context<'_>,
1445        buf: &mut [u8],
1446    ) -> Poll<std::io::Result<usize>> {
1447        let len = ready!(Self::poll_read(self.get_mut(), cx, buf))?;
1448        Poll::Ready(Ok(len))
1449    }
1450}
1451
1452impl futures_io::AsyncWrite for WriteStream {
1453    fn poll_write(
1454        self: Pin<&mut Self>,
1455        cx: &mut Context<'_>,
1456        buf: &[u8],
1457    ) -> Poll<std::io::Result<usize>> {
1458        let len = ready!(Self::poll_write(self.get_mut(), cx, buf, false))?;
1459        Poll::Ready(Ok(len))
1460    }
1461
1462    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
1463        Poll::Ready(Ok(()))
1464    }
1465
1466    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
1467        ready!(Self::poll_finish_write(self.get_mut(), cx))?;
1468        Poll::Ready(Ok(()))
1469    }
1470}
1471
1472enum ReadStatus<T> {
1473    Readable(T),
1474    Finished(Option<T>),
1475    Blocked(Option<T>),
1476}
1477
1478impl<T> From<(Option<T>, bool)> for ReadStatus<T> {
1479    fn from(status: (Option<T>, bool)) -> Self {
1480        match status {
1481            (read, true) => Self::Finished(read),
1482            (read, false) => Self::Blocked(read),
1483        }
1484    }
1485}
1486
1487enum WriteStatus<T> {
1488    Writable(T),
1489    Finished(Option<T>),
1490    Blocked(Option<T>),
1491}
1492
1493impl<T> From<(Option<T>, bool)> for WriteStatus<T> {
1494    fn from(status: (Option<T>, bool)) -> Self {
1495        match status {
1496            (write, true) => Self::Finished(write),
1497            (write, false) => Self::Blocked(write),
1498        }
1499    }
1500}
1501
1502#[derive(Debug, Error, Clone, PartialEq, Eq)]
1503pub enum StartError {
1504    #[error("connection not started yet")]
1505    ConnectionNotStarted,
1506    #[error("reach stream count limit")]
1507    LimitReached,
1508    #[error("connection lost")]
1509    ConnectionLost(#[from] ConnectionError),
1510    #[error("other error: status 0x{0:x}")]
1511    OtherError(u32),
1512}
1513
1514#[derive(Debug, Error, Clone, PartialEq, Eq)]
1515pub enum ReadError {
1516    #[error("stream not opened for reading")]
1517    Closed,
1518    #[error("stream reset by peer: error {0}")]
1519    Reset(u64),
1520    #[error("connection lost")]
1521    ConnectionLost(#[from] ConnectionError),
1522    #[error("other error: status 0x{0:x}")]
1523    OtherError(u32),
1524}
1525
1526impl From<ReadError> for std::io::Error {
1527    fn from(e: ReadError) -> Self {
1528        let kind = match e {
1529            ReadError::Closed => std::io::ErrorKind::NotConnected,
1530            ReadError::Reset(_) => std::io::ErrorKind::ConnectionReset,
1531            ReadError::ConnectionLost(ConnectionError::ConnectionClosed) => {
1532                std::io::ErrorKind::NotConnected
1533            }
1534            ReadError::ConnectionLost(_) => std::io::ErrorKind::ConnectionAborted,
1535            ReadError::OtherError(_) => std::io::ErrorKind::Other,
1536        };
1537        Self::new(kind, e)
1538    }
1539}
1540
1541#[derive(Debug, Error, Clone, PartialEq, Eq)]
1542pub enum WriteError {
1543    #[error("stream not opened for writing")]
1544    Closed,
1545    #[error("stream finished")]
1546    Finished,
1547    #[error("stream stopped by peer: error {0}")]
1548    Stopped(u64),
1549    #[error("connection lost")]
1550    ConnectionLost(#[from] ConnectionError),
1551    #[error("other error: status 0x{0:x}")]
1552    OtherError(u32),
1553}
1554
1555impl From<WriteError> for std::io::Error {
1556    fn from(e: WriteError) -> Self {
1557        let kind = match e {
1558            WriteError::Closed
1559            | WriteError::Finished
1560            | WriteError::ConnectionLost(ConnectionError::ConnectionClosed) => {
1561                std::io::ErrorKind::NotConnected
1562            }
1563            WriteError::Stopped(_) => std::io::ErrorKind::ConnectionReset,
1564            WriteError::ConnectionLost(_) => std::io::ErrorKind::ConnectionAborted,
1565            WriteError::OtherError(_) => std::io::ErrorKind::Other,
1566        };
1567        Self::new(kind, e)
1568    }
1569}