Skip to main content

ntex_io/
io.rs

1use std::cell::{Cell, UnsafeCell};
2use std::future::{Future, poll_fn};
3use std::task::{Context, Poll};
4use std::{fmt, hash, io, marker, mem, ops, pin::Pin, ptr, rc::Rc};
5
6use ntex_bytes::{BytePageSize, BytesMut};
7use ntex_codec::{Decoder, Encoder};
8use ntex_service::cfg::{Cfg, SharedCfg};
9use ntex_util::{future::Either, task::LocalWaker, time::Sleep};
10
11use crate::buf::Stack;
12use crate::cfg::IoConfig;
13use crate::ctx::IoContext;
14use crate::filter::{Base, Filter, Layer};
15use crate::filterptr::FilterPtr;
16use crate::flags::Flags;
17use crate::ops::{Id, IoManager, TimerHandle};
18use crate::seal::{IoBoxed, Sealed};
19use crate::{Decoded, FilterLayer, Handle, IoStatusUpdate, IoStream, RecvError};
20
21/// Interface object to the underlying I/O stream
22pub struct Io<F = Base>(UnsafeCell<IoRef>, marker::PhantomData<F>);
23
24#[derive(Clone)]
25pub struct IoRef(pub(super) Rc<IoState>);
26
27pub(crate) struct IoState {
28    filter: FilterPtr,
29    pub(super) id: Cell<Id>,
30    pub(super) cfg: Cfg<IoConfig>,
31    pub(super) flags: Flags,
32    pub(super) error: Cell<Option<io::Error>>,
33    pub(super) read_task: LocalWaker,
34    pub(super) write_task: LocalWaker,
35    dispatch_task: LocalWaker,
36    pub(super) buffer: Stack,
37    pub(super) handle: Cell<Option<Box<dyn Handle>>>,
38    pub(super) timeout: Cell<TimerHandle>,
39    pub(super) shutdown_timeout: Cell<Option<Sleep>>,
40    #[allow(clippy::box_collection)]
41    pub(super) on_disconnect: Cell<Option<Box<Vec<LocalWaker>>>>,
42}
43
44impl IoState {
45    pub(super) fn id(&self) -> Id {
46        self.id.get()
47    }
48
49    pub(super) fn tag(&self) -> &'static str {
50        self.cfg.tag()
51    }
52
53    pub(super) fn filter(&self) -> &dyn Filter {
54        self.filter.get()
55    }
56
57    pub(super) fn notify_timeout(&self) {
58        if self.flags.check_dispatcher_timeout_unset() {
59            self.wake_dispatch_task();
60            log::trace!("{}: Timer, notify dispatcher", self.cfg.tag());
61        }
62    }
63
64    pub(super) fn notify_disconnect(&self) {
65        if let Some(on_disconnect) = self.on_disconnect.take() {
66            for item in on_disconnect.into_iter() {
67                item.wake();
68            }
69        }
70    }
71
72    /// Get the current I/O error.
73    pub(super) fn error(&self) -> Option<io::Error> {
74        if let Some(err) = self.error.take() {
75            self.error
76                .set(Some(io::Error::new(err.kind(), format!("{err}"))));
77            Some(err)
78        } else {
79            None
80        }
81    }
82
83    /// Returns the current I/O error, or creates a `NotConnected` error.
84    pub(super) fn error_or_disconnected(&self) -> io::Error {
85        self.error()
86            .unwrap_or_else(|| io::Error::new(io::ErrorKind::NotConnected, "Disconnected"))
87    }
88
89    pub(super) fn filters_stopped(&self) {
90        self.wake_read_task();
91        self.wake_write_task();
92        self.wake_dispatch_task();
93        self.flags.set_filters_stopped();
94    }
95
96    pub(super) fn terminate_connection(&self, err: Option<io::Error>) {
97        if !self.flags.is_terminated() {
98            log::trace!("{}: Terminate io with error {:?}", self.cfg.tag(), err);
99            if err.is_some() {
100                self.error.set(err);
101            }
102            self.flags.set_terminate();
103            self.wake_read_task();
104            self.wake_write_task();
105            self.wake_dispatch_task();
106            self.notify_disconnect();
107            self.handle.take();
108        }
109    }
110
111    /// Gracefully shuts down the read and write I/O tasks.
112    pub(super) fn start_shutdown(&self) {
113        if !self.flags.is_stopping_any() {
114            log::trace!("{}: Initiate io shutdown {:?}", self.cfg.tag(), self.flags);
115            self.flags.set_filter_stopping();
116            self.wake_read_task();
117            self.wake_write_task();
118        }
119    }
120
121    pub(super) fn get_read_buf(&self) -> BytesMut {
122        self.cfg.read_buf().get()
123    }
124
125    pub(super) fn is_rd_backpressure_needed(&self, size: usize) -> bool {
126        size >= self.cfg.read_buf().high
127    }
128
129    pub(super) fn is_wr_backpressure_needed(&self, size: usize) -> bool {
130        size >= self.cfg.write_buf().high
131    }
132
133    pub(super) fn should_disable_wr_backpressure(&self, size: usize) -> bool {
134        size <= self.cfg.write_buf().half
135    }
136
137    pub(super) fn wake_read_task(&self) {
138        self.read_task.wake();
139    }
140
141    pub(super) fn wake_write_task(&self) {
142        #[cfg(feature = "trace")]
143        log::trace!("{}: Wake write task, flags:{:?}", self.tag(), self.flags);
144        self.write_task.wake();
145    }
146
147    pub(super) fn wake_dispatch_task(&self) {
148        self.dispatch_task.wake();
149    }
150}
151
152impl Eq for IoState {}
153
154impl PartialEq for IoState {
155    #[inline]
156    fn eq(&self, other: &Self) -> bool {
157        ptr::eq(self, other)
158    }
159}
160
161impl hash::Hash for IoState {
162    #[inline]
163    fn hash<H: hash::Hasher>(&self, state: &mut H) {
164        (ptr::from_ref(self) as usize).hash(state);
165    }
166}
167
168impl fmt::Debug for IoState {
169    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
170        let err = self.error.take();
171        let res = f
172            .debug_struct("IoState")
173            .field("id", &self.id)
174            .field("flags", &self.flags)
175            .field("filter", &self.filter.is_set())
176            .field("timeout", &self.timeout)
177            .field("error", &err)
178            .field("buffer", &self.buffer)
179            .field("cfg", &self.cfg)
180            .finish();
181        self.error.set(err);
182        res
183    }
184}
185
186impl Io {
187    /// Creates a new `Io` instance.
188    pub fn new<I: IoStream, T: Into<SharedCfg>>(io: I, cfg: T) -> Self {
189        let cfg = cfg.into().get::<IoConfig>();
190        let size = cfg.write_page_size();
191        let flags = Flags::new(cfg.write_buf_threshold() > 0);
192
193        let inner = Rc::new(IoState {
194            cfg,
195            flags,
196            id: Cell::new(Id::default()),
197            filter: FilterPtr::null(),
198            error: Cell::new(None),
199            dispatch_task: LocalWaker::new(),
200            read_task: LocalWaker::new(),
201            write_task: LocalWaker::new(),
202            buffer: Stack::new(size),
203            handle: Cell::new(None),
204            timeout: Cell::new(TimerHandle::default()),
205            shutdown_timeout: Cell::new(None),
206            on_disconnect: Cell::new(None),
207        });
208        inner.filter.set(Base::new(IoRef(inner.clone())));
209
210        let ioref = IoRef(inner);
211        ioref.0.id.set(IoManager::register(&ioref));
212
213        // start io tasks
214        let hnd = io.start(IoContext::new(ioref.clone()));
215        ioref.0.handle.set(Some(hnd));
216
217        Io(UnsafeCell::new(ioref), marker::PhantomData)
218    }
219}
220
221impl<I: IoStream> From<I> for Io {
222    #[inline]
223    fn from(io: I) -> Io {
224        Io::new(io, SharedCfg::default())
225    }
226}
227
228impl IoRef {
229    fn create_empty() -> IoRef {
230        IoRef(Rc::new(IoState {
231            id: Cell::new(Id::default()),
232            cfg: SharedCfg::default().get::<IoConfig>(),
233            filter: FilterPtr::null(),
234            flags: Flags::new_stopped(),
235            error: Cell::new(None),
236            dispatch_task: LocalWaker::new(),
237            read_task: LocalWaker::new(),
238            write_task: LocalWaker::new(),
239            buffer: Stack::new(BytePageSize::Size16),
240            handle: Cell::new(None),
241            timeout: Cell::new(TimerHandle::default()),
242            shutdown_timeout: Cell::new(None),
243            on_disconnect: Cell::new(None),
244        }))
245    }
246}
247
248impl<F> Io<F> {
249    #[inline]
250    /// Get an instance of `IoRef`.
251    pub fn get_ref(&self) -> IoRef {
252        self.io_ref().clone()
253    }
254
255    #[inline]
256    #[must_use]
257    /// Takes the current I/O object.
258    ///
259    /// After this call, the I/O object is no longer valid for use.
260    pub fn take(&self) -> Self {
261        Self(UnsafeCell::new(self.take_io_ref()), marker::PhantomData)
262    }
263
264    fn take_io_ref(&self) -> IoRef {
265        unsafe { mem::replace(&mut *self.0.get(), IoRef::create_empty()) }
266    }
267
268    fn st(&self) -> &IoState {
269        unsafe { &(*self.0.get()).0 }
270    }
271
272    fn io_ref(&self) -> &IoRef {
273        unsafe { &*self.0.get() }
274    }
275
276    #[inline]
277    /// Updates the shared I/O configuration.
278    pub fn set_config<T: Into<SharedCfg>>(&self, cfg: T) {
279        unsafe {
280            let cfg = cfg.into().get::<IoConfig>();
281            self.st().buffer.set_page_size(cfg.write_page_size());
282            self.st().cfg.replace(cfg);
283        }
284    }
285}
286
287impl<F: FilterLayer, T: Filter> Io<Layer<F, T>> {
288    #[inline]
289    /// Returns a reference to a filter.
290    pub fn filter(&self) -> &F {
291        &self.st().filter.filter::<Layer<F, T>>().0
292    }
293}
294
295impl<F: Filter> Io<F> {
296    #[inline]
297    /// Convert the current I/O stream into a sealed version.
298    pub fn seal(self) -> Io<Sealed> {
299        let state = self.take_io_ref();
300        state.0.filter.seal::<F>();
301
302        Io(UnsafeCell::new(state), marker::PhantomData)
303    }
304
305    #[inline]
306    /// Convert the current I/O stream into a boxed version.
307    pub fn boxed(self) -> IoBoxed {
308        self.seal().into()
309    }
310
311    #[inline]
312    /// Adds a new processing layer to the current filter chain.
313    pub fn add_filter<U>(self, nf: U) -> Io<Layer<U, F>>
314    where
315        U: FilterLayer,
316    {
317        // Write buffer processing may be delayed,
318        // call the filter chain to process pending writes
319        if let Err(e) = self.st().buffer.process_write_buf(&self) {
320            self.st().terminate_connection(Some(e));
321        }
322
323        let state = self.take_io_ref();
324
325        // Add the buffers layer.
326        //
327        // Safety: no references into the buffer storage are retained.
328        // All APIs first remove the buffer from storage before processing it.
329        unsafe { &mut *(Rc::as_ptr(&state.0).cast_mut()) }
330            .buffer
331            .add_layer(self.st().cfg.write_page_size());
332
333        // Replace current filter
334        state.0.filter.add_filter::<F, U>(nf);
335
336        if let Err(e) = self.st().buffer.process_read_buf(&self, 0) {
337            self.st().terminate_connection(Some(e));
338        }
339
340        Io(UnsafeCell::new(state), marker::PhantomData)
341    }
342
343    /// Wraps the current layer with a wrapper.
344    pub fn map_filter<U, R>(self, f: U) -> Io<R>
345    where
346        U: FnOnce(F) -> R,
347        R: Filter,
348    {
349        // Write buffer processing may be delayed,
350        // call the filter chain to process pending writes
351        if let Err(e) = self.st().buffer.process_write_buf(&self) {
352            self.st().terminate_connection(Some(e));
353        }
354
355        let state = self.take_io_ref();
356        state.0.filter.map_filter::<F, U, R>(f);
357
358        Io(UnsafeCell::new(state), marker::PhantomData)
359    }
360}
361
362impl<F> Io<F> {
363    /// Reads from the incoming I/O stream and decodes a codec item.
364    pub async fn recv<U>(
365        &self,
366        codec: &U,
367    ) -> Result<Option<U::Item>, Either<U::Error, io::Error>>
368    where
369        U: Decoder,
370    {
371        loop {
372            return match poll_fn(|cx| self.poll_recv(codec, cx)).await {
373                Ok(item) => Ok(Some(item)),
374                Err(RecvError::KeepAlive) => Err(Either::Right(io::Error::new(
375                    io::ErrorKind::TimedOut,
376                    "Timeout",
377                ))),
378                Err(RecvError::WriteBackpressure) => {
379                    poll_fn(|cx| self.poll_flush(cx, false))
380                        .await
381                        .map_err(Either::Right)?;
382                    continue;
383                }
384                Err(RecvError::Decoder(err)) => Err(Either::Left(err)),
385                Err(RecvError::PeerGone(Some(err))) => Err(Either::Right(err)),
386                Err(RecvError::PeerGone(None)) => Ok(None),
387            };
388        }
389    }
390
391    /// Reads bytes from this I/O stream into the specified buffer.
392    ///
393    /// If there is not enough data available, waits for incoming data.
394    pub async fn read(&self, dst: &mut [u8]) -> io::Result<()> {
395        loop {
396            let completed = self.with_read_buf(|buf| {
397                if buf.len() >= dst.len() {
398                    let _ = io::Read::read(buf, dst).expect("Cannot fail");
399                    true
400                } else {
401                    false
402                }
403            });
404            if completed {
405                return Ok(());
406            }
407            self.read_ready().await?;
408        }
409    }
410
411    #[inline]
412    /// Waits until the I/O stream is ready for reading.
413    pub async fn read_ready(&self) -> io::Result<Option<()>> {
414        poll_fn(|cx| self.poll_read_ready(cx)).await
415    }
416
417    #[inline]
418    /// Waits until the I/O stream receives new data.
419    pub async fn read_notify(&self) -> io::Result<Option<()>> {
420        poll_fn(|cx| self.poll_read_notify(cx)).await
421    }
422
423    #[inline]
424    /// Pauses the read task.
425    pub fn pause(&self) {
426        let st = self.st();
427        if !st.flags.is_read_paused() {
428            st.wake_read_task();
429            st.flags.set_read_paused();
430        }
431    }
432
433    #[inline]
434    /// Encodes an item and sends it to the peer, fully flushing the write buffer.
435    pub async fn send<U>(
436        &self,
437        item: U::Item,
438        codec: &U,
439    ) -> Result<(), Either<U::Error, io::Error>>
440    where
441        U: Encoder,
442    {
443        self.encode(item, codec).map_err(Either::Left)?;
444
445        poll_fn(|cx| self.poll_flush(cx, true))
446            .await
447            .map_err(Either::Right)?;
448
449        Ok(())
450    }
451
452    #[inline]
453    /// Wakes the write task and requests a flush of buffered data.
454    ///
455    /// This is the async version of `.poll_flush()` method.
456    pub async fn flush(&self, full: bool) -> io::Result<()> {
457        poll_fn(|cx| self.poll_flush(cx, full)).await
458    }
459
460    #[inline]
461    /// Gracefully shuts down the I/O stream.
462    pub async fn shutdown(&self) -> io::Result<()> {
463        poll_fn(|cx| self.poll_shutdown(cx)).await
464    }
465
466    #[inline]
467    /// Polls for read readiness.
468    ///
469    /// If the I/O stream is not currently ready for reading,
470    /// this method will store a clone of the `Waker` from the provided `Context`.
471    /// When the I/O stream becomes ready for reading, `wake()` will be called on the waker.
472    ///
473    /// # Returns
474    ///
475    /// The function returns:
476    ///
477    /// - `Poll::Pending` if the I/O stream is not ready for reading.
478    /// - `Poll::Ready(Ok(Some(())))` if the I/O stream is ready for reading.
479    /// - `Poll::Ready(Ok(None))` if the I/O stream is disconnected.
480    /// - `Poll::Ready(Err(e))` if an error is encountered.
481    pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Option<()>>> {
482        let st = self.st();
483
484        if st.flags.is_closed() {
485            Poll::Ready(Ok(None))
486        } else {
487            let ready = st.flags.is_read_ready();
488
489            // If the dispatcher requests more data but no read occurs,
490            // restart the read task.
491            if st.flags.is_read_paused_or_backpressure() {
492                st.flags.unset_all_read_flags();
493                st.flags.unset_read_paused();
494                st.wake_read_task();
495                if ready {
496                    Poll::Ready(Ok(Some(())))
497                } else {
498                    st.dispatch_task.register(cx.waker());
499                    Poll::Pending
500                }
501            } else if ready {
502                Poll::Ready(Ok(Some(())))
503            } else {
504                if st.flags.is_read_paused() {
505                    st.wake_read_task();
506                    st.flags.unset_read_paused();
507                }
508                st.dispatch_task.register(cx.waker());
509                Poll::Pending
510            }
511        }
512    }
513
514    #[inline]
515    /// Polls the I/O stream for availability of incoming data.
516    pub fn poll_read_notify(&self, cx: &mut Context<'_>) -> Poll<io::Result<Option<()>>> {
517        let st = self.st();
518        if st.flags.is_stopping() {
519            Poll::Ready(Ok(None))
520        } else if st.flags.check_read_notifed() {
521            Poll::Ready(Ok(Some(())))
522        } else {
523            st.flags.set_read_notify();
524            if self.poll_read_ready(cx).is_ready() {
525                st.dispatch_task.register(cx.waker());
526            }
527            st.dispatch_task.register(cx.waker());
528            Poll::Pending
529        }
530    }
531
532    #[inline]
533    /// Decode codec item from incoming bytes stream.
534    ///
535    /// Wake read task and request to read more data if data is not enough for decoding.
536    /// If error get returned this method does not register waker for later wake up action.
537    pub fn poll_recv<U>(
538        &self,
539        codec: &U,
540        cx: &mut Context<'_>,
541    ) -> Poll<Result<U::Item, RecvError<U>>>
542    where
543        U: Decoder,
544    {
545        let decoded = self.poll_recv_decode(codec, cx)?;
546
547        if let Some(item) = decoded.item {
548            Poll::Ready(Ok(item))
549        } else {
550            Poll::Pending
551        }
552    }
553
554    #[inline]
555    /// Decode codec item from incoming bytes stream.
556    ///
557    /// Wake read task and request to read more data if data is not enough for decoding.
558    /// If error get returned this method does not register waker for later wake up action.
559    pub fn poll_recv_decode<U>(
560        &self,
561        codec: &U,
562        cx: &mut Context<'_>,
563    ) -> Result<Decoded<U::Item>, RecvError<U>>
564    where
565        U: Decoder,
566    {
567        let st = self.st();
568        st.flags.unset_read_ready();
569
570        let decoded = self
571            .decode_item(codec)
572            .map_err(|err| RecvError::Decoder(err))?;
573
574        if decoded.item.is_some() {
575            Ok(decoded)
576        } else if st.flags.is_stopping() {
577            Err(RecvError::PeerGone(st.error()))
578        } else if st.flags.check_dispatcher_timeout() {
579            Err(RecvError::KeepAlive)
580        } else if st.flags.is_wr_backpressure() {
581            Err(RecvError::WriteBackpressure)
582        } else {
583            match self.poll_read_ready(cx) {
584                Poll::Pending | Poll::Ready(Ok(Some(()))) => {
585                    #[cfg(feature = "trace")]
586                    if decoded.remains != 0 {
587                        log::trace!("{}: Not enough data to decode next frame", self.tag());
588                    }
589                    Ok(decoded)
590                }
591                Poll::Ready(Err(e)) => Err(RecvError::PeerGone(Some(e))),
592                Poll::Ready(Ok(None)) => Err(RecvError::PeerGone(None)),
593            }
594        }
595    }
596
597    #[inline]
598    /// Wakes the write task and instructs it to flush data.
599    ///
600    /// If `full` is true, wakes the dispatcher when all data has been flushed;
601    /// otherwise, it wakes when the write buffer size falls below the low-watermark size.
602    pub fn poll_flush(&self, cx: &mut Context<'_>, full: bool) -> Poll<io::Result<()>> {
603        let st = self.st();
604
605        // flush filter state
606        st.buffer.process_write_buf_force(self)?;
607        self.consolidate_write_state(false);
608
609        let len = st.buffer.write_buf_size();
610        if len > 0 {
611            if st.flags.is_closed() {
612                return Poll::Ready(Err(st.error_or_disconnected()));
613            } else if full {
614                st.flags.set_wants_write_flush();
615                st.dispatch_task.register(cx.waker());
616                return Poll::Pending;
617            } else if st.is_wr_backpressure_needed(len) {
618                st.flags.set_wr_backpressure();
619                st.dispatch_task.register(cx.waker());
620                return Poll::Pending;
621            }
622        }
623        if st.flags.is_closed() && !st.flags.is_write_flush() {
624            Poll::Ready(Err(st.error_or_disconnected()))
625        } else {
626            st.flags.unset_wr_backpressure_and_flush();
627            Poll::Ready(Ok(()))
628        }
629    }
630
631    #[inline]
632    /// Gracefully shuts down the I/O stream.
633    pub fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
634        let st = self.st();
635
636        if st.flags.is_stopping() {
637            if let Some(err) = st.error() {
638                Poll::Ready(Err(err))
639            } else {
640                Poll::Ready(Ok(()))
641            }
642        } else {
643            if !st.flags.is_stopping_filters() {
644                st.start_shutdown();
645            }
646            st.flags.unset_all_read_flags();
647            st.flags.unset_read_paused();
648
649            st.wake_read_task();
650            st.dispatch_task.register(cx.waker());
651            Poll::Pending
652        }
653    }
654
655    #[inline]
656    /// Pauses the read task.
657    ///
658    /// Returns status updates.
659    pub fn poll_read_pause(&self, cx: &mut Context<'_>) -> Poll<IoStatusUpdate> {
660        self.pause();
661        self.poll_status_update(cx)
662    }
663
664    #[inline]
665    /// Polls for available status updates.
666    pub fn poll_status_update(&self, cx: &mut Context<'_>) -> Poll<IoStatusUpdate> {
667        let st = self.st();
668        st.dispatch_task.register(cx.waker());
669        if st.flags.is_closed() {
670            Poll::Ready(IoStatusUpdate::PeerGone(st.error()))
671        } else if st.flags.check_dispatcher_timeout() {
672            Poll::Ready(IoStatusUpdate::KeepAlive)
673        } else if st.flags.is_wr_backpressure() {
674            // write backpressure is enabled and write buf smaller than half
675            if st.should_disable_wr_backpressure(st.buffer.write_buf_size()) {
676                st.flags.unset_wr_backpressure();
677            }
678            Poll::Ready(IoStatusUpdate::WriteBackpressure)
679        } else {
680            Poll::Pending
681        }
682    }
683
684    #[inline]
685    /// Registers a dispatch task.
686    pub fn poll_dispatch(&self, cx: &mut Context<'_>) {
687        self.st().dispatch_task.register(cx.waker());
688    }
689}
690
691impl<F> AsRef<IoRef> for Io<F> {
692    #[inline]
693    fn as_ref(&self) -> &IoRef {
694        self.io_ref()
695    }
696}
697
698impl<F> Eq for Io<F> {}
699
700impl<F> PartialEq for Io<F> {
701    #[inline]
702    fn eq(&self, other: &Self) -> bool {
703        self.io_ref().eq(other.io_ref())
704    }
705}
706
707impl<F> hash::Hash for Io<F> {
708    #[inline]
709    fn hash<H: hash::Hasher>(&self, state: &mut H) {
710        self.io_ref().hash(state);
711    }
712}
713
714impl<F> fmt::Debug for Io<F> {
715    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
716        f.debug_struct("Io").field("state", self.st()).finish()
717    }
718}
719
720impl<F> ops::Deref for Io<F> {
721    type Target = IoRef;
722
723    #[inline]
724    fn deref(&self) -> &Self::Target {
725        self.io_ref()
726    }
727}
728
729impl<F> Drop for Io<F> {
730    fn drop(&mut self) {
731        let st = self.st();
732        self.stop_timer();
733
734        if st.filter.is_set() {
735            // filter is unsafe and must be dropped explicitly,
736            // and won't be dropped without special attention
737            if !st.flags.is_terminated() {
738                log::trace!("{}: Io is dropped, terminate connection", st.tag());
739            }
740
741            st.terminate_connection(None);
742            st.filter.drop_filter::<F>();
743        }
744
745        IoManager::unregister(self.io_ref());
746    }
747}
748
749#[derive(Debug)]
750/// The `OnDisconnect` future resolves when the I/O stream is disconnected.
751#[must_use = "OnDisconnect do nothing unless polled"]
752pub struct OnDisconnect {
753    token: usize,
754    inner: Rc<IoState>,
755}
756
757impl OnDisconnect {
758    pub(super) fn new(inner: Rc<IoState>) -> Self {
759        Self::new_inner(inner.flags.is_stopping(), inner)
760    }
761
762    fn new_inner(disconnected: bool, inner: Rc<IoState>) -> Self {
763        let token = if disconnected {
764            usize::MAX
765        } else {
766            let mut on_disconnect = inner.on_disconnect.take();
767            let token = if let Some(ref mut on_disconnect) = on_disconnect {
768                let token = on_disconnect.len();
769                on_disconnect.push(LocalWaker::default());
770                token
771            } else {
772                on_disconnect = Some(Box::new(vec![LocalWaker::default()]));
773                0
774            };
775            inner.on_disconnect.set(on_disconnect);
776            token
777        };
778        Self { token, inner }
779    }
780
781    #[inline]
782    /// Checks if the I/O stream is disconnected.
783    pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<()> {
784        if self.token == usize::MAX || self.inner.flags.is_stopping() {
785            Poll::Ready(())
786        } else if let Some(on_disconnect) = self.inner.on_disconnect.take() {
787            on_disconnect[self.token].register(cx.waker());
788            self.inner.on_disconnect.set(Some(on_disconnect));
789            Poll::Pending
790        } else {
791            Poll::Ready(())
792        }
793    }
794}
795
796impl Clone for OnDisconnect {
797    fn clone(&self) -> Self {
798        if self.token == usize::MAX {
799            OnDisconnect::new_inner(true, self.inner.clone())
800        } else {
801            OnDisconnect::new_inner(false, self.inner.clone())
802        }
803    }
804}
805
806impl Future for OnDisconnect {
807    type Output = ();
808
809    #[inline]
810    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
811        self.poll_ready(cx)
812    }
813}
814
815#[cfg(test)]
816mod tests {
817    use ntex_bytes::{BufMut, BytePages, Bytes, BytesMut};
818    use ntex_codec::BytesCodec;
819    use ntex_util::{future::lazy, time::Millis, time::sleep};
820
821    use super::*;
822    use crate::{
823        FilterBuf, IoContext, IoTaskStatus, Readiness, ops::Iops, testing::IoTest,
824    };
825
826    const BIN: &[u8] = b"GET /test HTTP/1\r\n\r\n";
827    const TEXT: &str = "GET /test HTTP/1\r\n\r\n";
828    const BIN2: &[u8] = b"12345678901234561234567890123456";
829
830    #[ntex::test]
831    async fn test_basics() {
832        let (client, server) = IoTest::create();
833        client.remote_buffer_cap(1024);
834
835        let server = Io::from(server);
836        assert!(server.eq(&server));
837        assert!(server.io_ref().eq(server.io_ref()));
838    }
839
840    #[ntex::test]
841    async fn test_recv() {
842        let (client, server) = IoTest::create();
843        client.remote_buffer_cap(1024);
844
845        let server = Io::new(server, SharedCfg::new("SRV"));
846
847        server.st().notify_timeout();
848        let err = server.recv(&BytesCodec).await.err().unwrap();
849        assert!(format!("{err:?}").contains("Timeout"));
850
851        client.write(TEXT);
852        server.st().flags.set_wr_backpressure();
853        let item = server.recv(&BytesCodec).await.ok().unwrap().unwrap();
854        assert_eq!(item, TEXT);
855    }
856
857    #[ntex::test]
858    async fn test_send() {
859        let (client, server) = IoTest::create();
860        client.remote_buffer_cap(1024);
861
862        let server = Io::from(server);
863        assert!(server.eq(&server));
864
865        server
866            .send(Bytes::from_static(BIN), &BytesCodec)
867            .await
868            .ok()
869            .unwrap();
870        let item = client.read_any();
871        assert_eq!(item, TEXT);
872    }
873
874    #[ntex::test]
875    async fn read() {
876        let io = Io::new(
877            IoTest::create().0,
878            SharedCfg::new("SRV").add(IoConfig::default().set_read_buf(8, 4, 16)),
879        );
880        assert!(lazy(|cx| io.poll_read_ready(cx)).await.is_pending());
881        assert!(io.st().dispatch_task.is_set());
882
883        let ctx = IoContext::new(io.get_ref());
884
885        // Ready
886        assert_eq!(
887            lazy(|cx| ctx.poll_read_ready(cx)).await,
888            Poll::Ready(Readiness::Ready)
889        );
890        assert!(io.st().read_task.is_set());
891        assert!(!io.st().flags.is_read_ready());
892        assert!(!io.st().flags.is_rd_backpressure());
893
894        // == Enable backpressure
895        ctx.update_read_status(BytesMut::copy_from_slice(b"1234567890"), Ok(10));
896
897        // dispatcher is woken
898        assert!(!io.st().dispatch_task.is_set());
899        // read task is paused
900        assert!(io.st().flags.is_read_paused());
901        // read buffer is ready
902        assert!(io.st().flags.is_read_ready());
903        // read backpressure is enabled
904        assert!(io.st().flags.is_rd_backpressure());
905        // read task paused
906        assert_eq!(lazy(|cx| ctx.poll_read_ready(cx)).await, Poll::Pending);
907
908        // read one byte
909        assert_eq!(io.with_read_buf(|buf| buf.split_to(1)), b"1");
910        // read buffer is ready
911        assert!(io.st().flags.is_read_ready());
912        // read backpressure is enabled
913        assert!(io.st().flags.is_rd_backpressure());
914
915        // read task is set
916        assert!(io.st().read_task.is_set());
917
918        // read one more byte
919        assert_eq!(io.with_read_buf(|buf| buf.split_to(1)), b"2");
920        // read backpressure is enabled
921        assert!(io.st().flags.is_rd_backpressure());
922
923        // read 4 bytes. buf size is 4, less that half of high watermark
924        assert_eq!(io.with_read_buf(|buf| buf.split_to(4)), b"3456");
925        // read task is not paused anymore
926        assert!(!io.st().flags.is_read_paused());
927        // read buffer is not ready
928        assert!(!io.st().flags.is_read_ready());
929        // read backpressure is disabled
930        assert!(!io.st().flags.is_rd_backpressure());
931        // read task is woken
932        assert!(!io.st().read_task.is_set());
933        assert_eq!(
934            lazy(|cx| ctx.poll_read_ready(cx)).await,
935            Poll::Ready(Readiness::Ready)
936        );
937
938        // register dispatcher task
939        lazy(|cx| io.poll_dispatch(cx)).await;
940
941        // == Enable backpressure, 4 bytes in buffer + 4 more
942        ctx.update_read_status(BytesMut::copy_from_slice(b"1234"), Ok(4));
943
944        // dispatcher is woken
945        assert!(!io.st().dispatch_task.is_set());
946        // read task is paused
947        assert!(io.st().flags.is_read_paused());
948        // read buffer is ready
949        assert!(io.st().flags.is_read_ready());
950        // read backpressure is enabled
951        assert!(io.st().flags.is_rd_backpressure());
952        // read task paused
953        assert_eq!(lazy(|cx| ctx.poll_read_ready(cx)).await, Poll::Pending);
954
955        // read 4 bytes. buf size is 4, less that half of high watermark
956        assert_eq!(io.with_read_buf(|buf| buf.split_to(4)), b"7890");
957        // read backpressure is disabled
958        assert!(!io.st().flags.is_rd_backpressure());
959
960        // register dispatcher task
961        lazy(|cx| io.poll_dispatch(cx)).await;
962
963        // == No backpressure, 4 bytes in buffer + 3 more
964        ctx.update_read_status(BytesMut::copy_from_slice(b"567"), Ok(3));
965
966        // read task is paused
967        assert!(!io.st().flags.is_read_paused());
968        // read buffer is ready
969        assert!(io.st().flags.is_read_ready());
970        // read backpressure is enabled
971        assert!(!io.st().flags.is_rd_backpressure());
972        // read task ready
973        assert_eq!(
974            lazy(|cx| ctx.poll_read_ready(cx)).await,
975            Poll::Ready(Readiness::Ready)
976        );
977
978        // read 4 bytes. buf size is 4, less that half of high watermark
979        assert_eq!(io.with_read_buf(BytesMut::take), b"1234567");
980        // read task is paused
981        assert!(!io.st().flags.is_read_paused());
982        // read buffer is ready
983        assert!(!io.st().flags.is_read_ready());
984        // read task is not woken
985        assert!(io.st().read_task.is_set());
986
987        // == Terminate
988        io.terminate();
989        // read task is woken
990        assert!(!io.st().read_task.is_set());
991        // read task ready
992        assert_eq!(
993            lazy(|cx| ctx.poll_read_ready(cx)).await,
994            Poll::Ready(Readiness::Terminate)
995        );
996    }
997
998    #[ntex::test]
999    async fn read_notify() {
1000        let io = Io::new(
1001            IoTest::create().0,
1002            SharedCfg::new("SRV").add(IoConfig::default().set_read_buf(8, 4, 16)),
1003        );
1004        assert!(!io.st().flags.is_read_notify());
1005        assert!(lazy(|cx| io.poll_read_notify(cx)).await.is_pending());
1006        assert!(io.st().dispatch_task.is_set());
1007        assert!(io.st().flags.is_read_notify());
1008
1009        let ctx = IoContext::new(io.get_ref());
1010
1011        // incoming bytes
1012        ctx.update_read_status(BytesMut::copy_from_slice(b"1"), Ok(1));
1013
1014        assert!(!io.st().dispatch_task.is_set());
1015        // rd buffer is ready
1016        assert!(io.st().flags.is_read_ready());
1017        assert!(io.st().flags.is_read_notify());
1018        // dispatcher is notified
1019        assert!(io.st().flags.is_read_notified());
1020        let res = lazy(|cx| io.poll_read_notify(cx)).await;
1021        assert!(matches!(res, Poll::Ready(Ok(Some(())))));
1022
1023        // disapcher is not set
1024        assert!(!io.st().dispatch_task.is_set());
1025        // rd buffer is ready
1026        assert!(io.st().flags.is_read_ready());
1027
1028        // == start notification process again
1029        assert!(lazy(|cx| io.poll_read_notify(cx)).await.is_pending());
1030        assert!(io.st().dispatch_task.is_set());
1031        assert!(io.st().flags.is_read_notify());
1032        assert!(io.st().flags.is_read_ready());
1033        // read task ready
1034        assert_eq!(
1035            lazy(|cx| ctx.poll_read_ready(cx)).await,
1036            Poll::Ready(Readiness::Ready)
1037        );
1038
1039        // == enable packpressure
1040        ctx.update_read_status(BytesMut::copy_from_slice(b"2345678"), Ok(7));
1041        // read backpressure is enabled
1042        assert!(io.st().flags.is_rd_backpressure());
1043
1044        // rd buffer is ready
1045        assert!(io.st().flags.is_read_ready());
1046        assert!(io.st().flags.is_read_notify());
1047        // dispatcher is notified
1048        assert!(io.st().flags.is_read_notified());
1049        let res = lazy(|cx| io.poll_read_notify(cx)).await;
1050        assert!(matches!(res, Poll::Ready(Ok(Some(())))));
1051        // read task paused
1052        assert_eq!(lazy(|cx| ctx.poll_read_ready(cx)).await, Poll::Pending);
1053        // read task is set
1054        assert!(io.st().read_task.is_set());
1055
1056        // == start notification process again
1057        assert!(lazy(|cx| io.poll_read_notify(cx)).await.is_pending());
1058        // read flags active
1059        assert!(!io.st().flags.is_rd_backpressure());
1060        assert!(!io.st().flags.is_read_ready());
1061        assert!(!io.st().flags.is_read_paused());
1062        // read task is woken
1063        assert!(!io.st().read_task.is_set());
1064        // read task ready
1065        assert_eq!(
1066            lazy(|cx| ctx.poll_read_ready(cx)).await,
1067            Poll::Ready(Readiness::Ready)
1068        );
1069
1070        // incoming bytes
1071        ctx.update_read_status(BytesMut::copy_from_slice(b"1"), Ok(1));
1072        assert!(!io.st().dispatch_task.is_set());
1073        // rd buffer is ready
1074        assert!(io.st().flags.is_read_ready());
1075        assert!(io.st().flags.is_read_notify());
1076        assert!(io.st().flags.is_read_paused());
1077        assert!(io.st().flags.is_rd_backpressure());
1078        // dispatcher is notified
1079        assert!(io.st().flags.is_read_notified());
1080        assert!(matches!(
1081            lazy(|cx| io.poll_read_notify(cx)).await,
1082            Poll::Ready(Ok(Some(())))
1083        ));
1084
1085        // == Terminate
1086        io.terminate();
1087        let res = lazy(|cx| io.poll_read_notify(cx)).await;
1088        assert!(matches!(res, Poll::Ready(Ok(None))), "{res:?}");
1089    }
1090
1091    #[ntex::test]
1092    async fn read_readiness() {
1093        let (client, server) = IoTest::create();
1094        client.remote_buffer_cap(1024);
1095
1096        let io = Io::from(server);
1097        assert!(lazy(|cx| io.poll_read_ready(cx)).await.is_pending());
1098
1099        client.write(TEXT);
1100        assert_eq!(io.read_ready().await.unwrap(), Some(()));
1101        assert!(matches!(
1102            lazy(|cx| io.poll_read_ready(cx)).await,
1103            Poll::Ready(Ok(Some(())))
1104        ));
1105
1106        let item = io.with_read_buf(BytesMut::take);
1107        assert_eq!(item, Bytes::from_static(BIN));
1108
1109        client.write(TEXT);
1110        sleep(Millis(50)).await;
1111        assert!(lazy(|cx| io.poll_read_ready(cx)).await.is_ready());
1112        assert!(lazy(|cx| io.poll_read_ready(cx)).await.is_ready());
1113    }
1114
1115    #[ntex::test]
1116    async fn read_backpressure() {
1117        let (client, server) = IoTest::create();
1118
1119        let io = Io::new(
1120            server,
1121            SharedCfg::new("SRV").add(IoConfig::default().set_read_buf(64, 32, 12)),
1122        );
1123        assert!(lazy(|cx| io.poll_read_ready(cx)).await.is_pending());
1124
1125        client.write(BIN2);
1126        client.write(BIN2);
1127        sleep(Millis(50)).await;
1128        assert!(io.flags().is_read_ready());
1129        assert!(io.flags().is_rd_backpressure());
1130        let _item = io.recv(&BytesCodec).await.ok().unwrap().unwrap();
1131        assert!(!io.flags().is_read_ready());
1132        assert!(!io.flags().is_rd_backpressure());
1133
1134        client.write(BIN2);
1135        client.write(BIN2);
1136        sleep(Millis(50)).await;
1137        assert!(io.flags().is_read_ready());
1138        assert!(io.flags().is_rd_backpressure());
1139        assert_eq!(io.read_ready().await.unwrap(), Some(()));
1140    }
1141
1142    #[ntex::test]
1143    async fn write() {
1144        let io = Io::new(
1145            IoTest::create().0,
1146            SharedCfg::new("SRV").add(IoConfig::default().set_write_buf(8, 4, 16)),
1147        );
1148        assert!(lazy(|cx| io.poll_status_update(cx)).await.is_pending());
1149        assert!(io.st().dispatch_task.is_set());
1150        assert!(io.st().flags.is_direct_wr_enabled());
1151
1152        let ctx = IoContext::new(io.get_ref());
1153
1154        // == No write work
1155        assert_eq!(lazy(|cx| ctx.poll_write_ready(cx)).await, Poll::Pending);
1156        assert!(io.st().write_task.is_set());
1157        assert!(io.st().flags.is_write_paused());
1158        assert!(!io.st().flags.is_wr_backpressure());
1159
1160        // write
1161        io.with_write_buf(|buf| buf.put_slice(b"1234")).unwrap();
1162        assert_eq!(lazy(|cx| ctx.poll_write_ready(cx)).await, Poll::Pending);
1163        // write task is paused
1164        assert!(io.st().flags.is_write_paused());
1165        // send-buf op is scheduled
1166        assert!(io.st().flags.is_wr_send_scheduled());
1167        // back-pressure is not enabled
1168        assert!(!io.st().flags.is_wr_backpressure());
1169        // dispatch is not woken up
1170        assert!(io.st().dispatch_task.is_set());
1171
1172        // == enable wr backpressure
1173        io.with_write_buf(|buf| buf.put_slice(b"5678")).unwrap();
1174        // back-pressure is enabled
1175        assert!(io.st().flags.is_wr_backpressure());
1176        // dispatch is woken up
1177        assert!(!io.st().dispatch_task.is_set());
1178        // write task is set
1179        assert!(io.st().write_task.is_set());
1180        // dispatcher gets WriteBackpressure
1181        assert!(matches!(
1182            lazy(|cx| io.poll_status_update(cx)).await,
1183            Poll::Ready(IoStatusUpdate::WriteBackpressure)
1184        ));
1185        // flush write buffer
1186        assert!(lazy(|cx| io.poll_flush(cx, false)).await.is_pending());
1187        // full flush is not enabled
1188        assert!(!io.st().flags.is_write_flush());
1189
1190        // run send-buf ops
1191        Iops::run();
1192        // send-buf op is not scheduled
1193        assert!(!io.st().flags.is_wr_send_scheduled());
1194        // write task is not paused
1195        assert!(!io.st().flags.is_write_paused());
1196        // write task has been woken up
1197        assert!(!io.st().write_task.is_set());
1198        // write task can proceed
1199        assert_eq!(
1200            lazy(|cx| ctx.poll_write_ready(cx)).await,
1201            Poll::Ready(Readiness::Ready)
1202        );
1203
1204        // wrote 4 bytes to io
1205        assert_eq!(ctx.with_write_buf(|buf| buf.split_to(4).freeze()), b"1234");
1206        // continue to write
1207        assert_eq!(ctx.update_write_status(Ok(true)), IoTaskStatus::Io);
1208        // write task can proceed
1209        assert_eq!(
1210            lazy(|cx| ctx.poll_write_ready(cx)).await,
1211            Poll::Ready(Readiness::Ready)
1212        );
1213        // write task is not paused
1214        assert!(!io.st().flags.is_write_paused());
1215        // back-pressure is enabled
1216        assert!(io.st().flags.is_wr_backpressure());
1217        // dispatcher gets WriteBackpressure, buf wr-backpressure flags is removed
1218        assert!(matches!(
1219            lazy(|cx| io.poll_status_update(cx)).await,
1220            Poll::Ready(IoStatusUpdate::WriteBackpressure)
1221        ));
1222        // back-pressure is disabled
1223        assert!(!io.st().flags.is_wr_backpressure());
1224        assert!(lazy(|cx| io.poll_status_update(cx)).await.is_pending());
1225        // write buffer is flushed
1226        assert!(matches!(
1227            lazy(|cx| io.poll_flush(cx, false)).await,
1228            Poll::Ready(Ok(()))
1229        ));
1230
1231        // full flush write buffer
1232        io.with_write_buf(|buf| buf.put_slice(b"1234")).unwrap();
1233        assert!(lazy(|cx| io.poll_flush(cx, true)).await.is_pending());
1234        // full flush is enabled
1235        assert!(io.st().flags.is_write_flush());
1236        // back-pressure is enabled
1237        assert!(io.st().flags.is_wr_backpressure());
1238
1239        // wrote all data
1240        Iops::run();
1241        assert_eq!(ctx.with_write_buf(BytePages::freeze), b"56781234");
1242        // write task is not paused, so send-buf op is not scheduled
1243        assert!(!io.st().flags.is_wr_send_scheduled());
1244        // update status, no more work
1245        assert_eq!(ctx.update_write_status(Ok(true)), IoTaskStatus::Pause);
1246        // write task is paused
1247        assert!(io.st().flags.is_write_paused());
1248        // flush is still enabled
1249        assert!(io.st().flags.is_write_flush());
1250        // back-pressure is still enabled
1251        assert!(io.st().flags.is_wr_backpressure());
1252        // dispatch is woken up
1253        assert!(!io.st().dispatch_task.is_set());
1254
1255        // write buffer is flushed
1256        assert!(matches!(
1257            lazy(|cx| io.poll_flush(cx, false)).await,
1258            Poll::Ready(Ok(()))
1259        ));
1260        // full flush is disabled
1261        assert!(!io.st().flags.is_write_flush());
1262        // back-pressure is disabled
1263        assert!(!io.st().flags.is_wr_backpressure());
1264
1265        // == Terminate
1266        io.terminate();
1267        // read task is woken
1268        assert!(!io.st().write_task.is_set());
1269        // write task ready
1270        assert_eq!(
1271            lazy(|cx| ctx.poll_write_ready(cx)).await,
1272            Poll::Ready(Readiness::Terminate)
1273        );
1274        // flush returns error
1275        let Poll::Ready(Err(err)) = lazy(|cx| io.poll_flush(cx, false)).await else {
1276            panic!()
1277        };
1278        assert_eq!(err.kind(), io::ErrorKind::NotConnected);
1279        // statis returns error
1280        assert!(matches!(
1281            lazy(|cx| io.poll_status_update(cx)).await,
1282            Poll::Ready(IoStatusUpdate::PeerGone(None))
1283        ));
1284    }
1285
1286    #[ntex::test]
1287    async fn write_backpressure() {
1288        let (client, server) = IoTest::create();
1289        client.remote_buffer_cap(0);
1290
1291        let io = Io::new(
1292            server,
1293            SharedCfg::new("SRV").add(IoConfig::default().set_write_buf(16, 8, 12)),
1294        );
1295        assert!(lazy(|cx| io.poll_read_ready(cx)).await.is_pending());
1296        assert!(io.flags().is_write_paused());
1297        assert!(!io.flags().is_wr_backpressure());
1298        assert!(!io.is_wr_backpressure());
1299
1300        io.encode_slice(BIN2).unwrap();
1301        assert!(Iops::is_registered(&io));
1302        assert!(io.flags().is_wr_backpressure());
1303
1304        client.remote_buffer_cap(1024);
1305        let item = client.read().await.unwrap();
1306        assert_eq!(item, BIN2);
1307        assert!(io.flags().is_wr_backpressure());
1308        assert!(matches!(
1309            lazy(|cx| io.poll_status_update(cx)).await,
1310            Poll::Ready(IoStatusUpdate::WriteBackpressure)
1311        ));
1312        assert!(!io.flags().is_wr_backpressure());
1313        assert!(matches!(
1314            lazy(|cx| io.poll_flush(cx, false)).await,
1315            Poll::Ready(Ok(()))
1316        ));
1317        assert!(!io.flags().is_wr_backpressure());
1318    }
1319
1320    #[ntex::test]
1321    async fn shutdown() {
1322        // layer drops all unprocessed data after filter shutdown
1323        #[derive(Debug)]
1324        struct F;
1325
1326        impl FilterLayer for F {
1327            fn process_read_buf(&self, _: &FilterBuf<'_>) -> io::Result<()> {
1328                Ok(())
1329            }
1330            fn process_write_buf(&self, _: &FilterBuf<'_>) -> io::Result<()> {
1331                Ok(())
1332            }
1333        }
1334
1335        let io = Io::new(
1336            IoTest::create().0,
1337            SharedCfg::new("SRV").add(IoConfig::default().set_write_buf(8, 4, 16)),
1338        );
1339        let st = io.st();
1340        assert!(lazy(|cx| io.poll_status_update(cx)).await.is_pending());
1341        assert!(st.dispatch_task.is_set());
1342        assert!(!st.flags.is_closed());
1343        assert!(!st.flags.is_stopping_filters());
1344
1345        let ctx = IoContext::new(io.get_ref());
1346
1347        // == init shutdown
1348        io.close();
1349        assert!(!st.flags.is_closed());
1350        assert!(st.flags.is_stopping_filters());
1351        // encoding is not allowed in shutting down stage
1352        let err = io.with_write_buf(|_| 1).unwrap_err();
1353        assert_eq!(err.kind(), io::ErrorKind::Other);
1354
1355        let io = io.add_filter(F);
1356        let layer = Layer::new(F, Base::new(io.get_ref()));
1357
1358        let st = io.st();
1359        st.buffer.with_write_src(|p| p.put_slice(b"123"));
1360        assert_eq!(st.buffer.write_buf_size(), 3);
1361        let res = st.buffer.with_filter(io.as_ref(), |f| layer.shutdown(f));
1362        assert!(matches!(res, Ok(Poll::Ready(()))));
1363        assert_eq!(st.buffer.write_buf_size(), 0);
1364
1365        // == terminate
1366        ctx.stop(None);
1367        assert!(st.flags.is_closed());
1368        assert!(st.flags.is_terminated());
1369        assert!(st.flags.is_stopping_filters());
1370
1371        let err = io.with_write_buf(|_| 1).unwrap_err();
1372        assert_eq!(err.kind(), io::ErrorKind::NotConnected);
1373    }
1374}