Skip to main content

ntex_io/
io.rs

1use std::cell::{Cell, UnsafeCell};
2use std::future::{Future, poll_fn};
3use std::task::{Context, Poll};
4use std::{fmt, hash, io, marker, mem, ops, pin::Pin, ptr, rc::Rc};
5
6use ntex_bytes::{BytePageSize, BytesMut};
7use ntex_codec::{Decoder, Encoder};
8use ntex_service::cfg::{Cfg, SharedCfg};
9use ntex_util::{future::Either, task::LocalWaker, time::Sleep};
10
11use crate::buf::Stack;
12use crate::cfg::IoConfig;
13use crate::ctx::IoContext;
14use crate::filter::{Base, Filter, Layer};
15use crate::filterptr::FilterPtr;
16use crate::flags::Flags;
17use crate::ops::{Id, IoManager, TimerHandle};
18use crate::seal::{IoBoxed, Sealed};
19use crate::{Decoded, FilterLayer, Handle, IoStatusUpdate, IoStream, RecvError};
20
21/// Interface object to the underlying I/O stream
22pub struct Io<F = Base>(UnsafeCell<IoRef>, marker::PhantomData<F>);
23
24#[derive(Clone)]
25pub struct IoRef(pub(super) Rc<IoState>);
26
27pub(crate) struct IoState {
28    filter: FilterPtr,
29    pub(super) id: Cell<Id>,
30    pub(super) cfg: Cfg<IoConfig>,
31    pub(super) flags: Flags,
32    pub(super) error: Cell<Option<io::Error>>,
33    pub(super) read_task: LocalWaker,
34    pub(super) write_task: LocalWaker,
35    dispatch_task: LocalWaker,
36    pub(super) buffer: Stack,
37    pub(super) handle: Cell<Option<Box<dyn Handle>>>,
38    pub(super) timeout: Cell<TimerHandle>,
39    pub(super) shutdown_timeout: Cell<Option<Sleep>>,
40    #[allow(clippy::box_collection)]
41    pub(super) on_disconnect: Cell<Option<Box<Vec<LocalWaker>>>>,
42}
43
44impl IoState {
45    pub(super) fn id(&self) -> Id {
46        self.id.get()
47    }
48
49    pub(super) fn tag(&self) -> &'static str {
50        self.cfg.tag()
51    }
52
53    pub(super) fn filter(&self) -> &dyn Filter {
54        self.filter.get()
55    }
56
57    pub(super) fn notify_timeout(&self) {
58        if self.flags.check_dispatcher_timeout_unset() {
59            self.wake_dispatch_task();
60            log::trace!("{}: Timer, notify dispatcher", self.cfg.tag());
61        }
62    }
63
64    pub(super) fn notify_disconnect(&self) {
65        if let Some(on_disconnect) = self.on_disconnect.take() {
66            for item in on_disconnect.into_iter() {
67                item.wake();
68            }
69        }
70    }
71
72    /// Get the current I/O error.
73    pub(super) fn error(&self) -> Option<io::Error> {
74        if let Some(err) = self.error.take() {
75            self.error
76                .set(Some(io::Error::new(err.kind(), format!("{err}"))));
77            Some(err)
78        } else {
79            None
80        }
81    }
82
83    /// Returns the current I/O error, or creates a `NotConnected` error.
84    pub(super) fn error_or_disconnected(&self) -> io::Error {
85        self.error()
86            .unwrap_or_else(|| io::Error::new(io::ErrorKind::NotConnected, "Disconnected"))
87    }
88
89    pub(super) fn filters_stopped(&self) {
90        self.wake_read_task();
91        self.wake_write_task();
92        self.wake_dispatch_task();
93        self.flags.set_filters_stopped();
94    }
95
96    pub(super) fn terminate_connection(&self, err: Option<io::Error>) {
97        if !self.flags.is_terminated() {
98            log::trace!("{}: Terminate io with error {:?}", self.cfg.tag(), err);
99            if err.is_some() {
100                self.error.set(err);
101            }
102            self.flags.set_terminate();
103            self.wake_read_task();
104            self.wake_write_task();
105            self.wake_dispatch_task();
106            self.notify_disconnect();
107            self.handle.take();
108        }
109    }
110
111    /// Gracefully shuts down the read and write I/O tasks.
112    pub(super) fn start_shutdown(&self) {
113        if !self.flags.is_stopping_any() {
114            log::trace!("{}: Initiate io shutdown {:?}", self.cfg.tag(), self.flags);
115            self.flags.set_filter_stopping();
116            self.wake_read_task();
117            self.wake_write_task();
118        }
119    }
120
121    pub(super) fn get_read_buf(&self) -> BytesMut {
122        self.cfg.read_buf().get()
123    }
124
125    pub(super) fn is_rd_backpressure_needed(&self, size: usize) -> bool {
126        size >= self.cfg.read_buf().high
127    }
128
129    pub(super) fn is_wr_backpressure_needed(&self, size: usize) -> bool {
130        size >= self.cfg.write_buf().high
131    }
132
133    pub(super) fn should_disable_wr_backpressure(&self, size: usize) -> bool {
134        size <= self.cfg.write_buf().half
135    }
136
137    pub(super) fn wake_read_task(&self) {
138        self.read_task.wake();
139    }
140
141    pub(super) fn wake_write_task(&self) {
142        #[cfg(feature = "trace")]
143        log::trace!("{}: Wake write task, flags:{:?}", self.tag(), self.flags);
144        self.write_task.wake();
145    }
146
147    pub(super) fn wake_dispatch_task(&self) {
148        self.dispatch_task.wake();
149    }
150}
151
152impl Eq for IoState {}
153
154impl PartialEq for IoState {
155    #[inline]
156    fn eq(&self, other: &Self) -> bool {
157        ptr::eq(self, other)
158    }
159}
160
161impl hash::Hash for IoState {
162    #[inline]
163    fn hash<H: hash::Hasher>(&self, state: &mut H) {
164        (ptr::from_ref(self) as usize).hash(state);
165    }
166}
167
168impl fmt::Debug for IoState {
169    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
170        let err = self.error.take();
171        let res = f
172            .debug_struct("IoState")
173            .field("id", &self.id)
174            .field("flags", &self.flags)
175            .field("filter", &self.filter.is_set())
176            .field("timeout", &self.timeout)
177            .field("error", &err)
178            .field("buffer", &self.buffer)
179            .field("cfg", &self.cfg)
180            .finish();
181        self.error.set(err);
182        res
183    }
184}
185
186impl Io {
187    /// Creates a new `Io` instance.
188    pub fn new<I: IoStream, T: Into<SharedCfg>>(io: I, cfg: T) -> Self {
189        let cfg = cfg.into().get::<IoConfig>();
190        let size = cfg.write_page_size();
191        let flags = Flags::new(cfg.write_buf_threshold() > 0);
192
193        let inner = Rc::new(IoState {
194            cfg,
195            flags,
196            id: Cell::new(Id::default()),
197            filter: FilterPtr::null(),
198            error: Cell::new(None),
199            dispatch_task: LocalWaker::new(),
200            read_task: LocalWaker::new(),
201            write_task: LocalWaker::new(),
202            buffer: Stack::new(size),
203            handle: Cell::new(None),
204            timeout: Cell::new(TimerHandle::default()),
205            shutdown_timeout: Cell::new(None),
206            on_disconnect: Cell::new(None),
207        });
208        inner.filter.set(Base::new(IoRef(inner.clone())));
209
210        let ioref = IoRef(inner);
211        ioref.0.id.set(IoManager::register(&ioref));
212
213        // start io tasks
214        let hnd = io.start(IoContext::new(ioref.clone()));
215        ioref.0.handle.set(Some(hnd));
216
217        Io(UnsafeCell::new(ioref), marker::PhantomData)
218    }
219}
220
221impl<I: IoStream> From<I> for Io {
222    #[inline]
223    fn from(io: I) -> Io {
224        Io::new(io, SharedCfg::default())
225    }
226}
227
228impl IoRef {
229    fn create_empty() -> IoRef {
230        IoRef(Rc::new(IoState {
231            id: Cell::new(Id::default()),
232            cfg: SharedCfg::default().get::<IoConfig>(),
233            filter: FilterPtr::null(),
234            flags: Flags::new_stopped(),
235            error: Cell::new(None),
236            dispatch_task: LocalWaker::new(),
237            read_task: LocalWaker::new(),
238            write_task: LocalWaker::new(),
239            buffer: Stack::new(BytePageSize::Size16),
240            handle: Cell::new(None),
241            timeout: Cell::new(TimerHandle::default()),
242            shutdown_timeout: Cell::new(None),
243            on_disconnect: Cell::new(None),
244        }))
245    }
246}
247
248impl<F> Io<F> {
249    #[inline]
250    /// Get an instance of `IoRef`.
251    pub fn get_ref(&self) -> IoRef {
252        self.io_ref().clone()
253    }
254
255    #[inline]
256    #[must_use]
257    /// Takes the current I/O object.
258    ///
259    /// After this call, the I/O object is no longer valid for use.
260    pub fn take(&self) -> Self {
261        Self(UnsafeCell::new(self.take_io_ref()), marker::PhantomData)
262    }
263
264    fn take_io_ref(&self) -> IoRef {
265        unsafe { mem::replace(&mut *self.0.get(), IoRef::create_empty()) }
266    }
267
268    fn st(&self) -> &IoState {
269        unsafe { &(*self.0.get()).0 }
270    }
271
272    fn io_ref(&self) -> &IoRef {
273        unsafe { &*self.0.get() }
274    }
275
276    #[inline]
277    /// Updates the shared I/O configuration.
278    pub fn set_config<T: Into<SharedCfg>>(&self, cfg: T) {
279        unsafe {
280            let cfg = cfg.into().get::<IoConfig>();
281            self.st().buffer.set_page_size(cfg.write_page_size());
282            self.st().cfg.replace(cfg);
283        }
284    }
285}
286
287impl<F: FilterLayer, T: Filter> Io<Layer<F, T>> {
288    #[inline]
289    /// Returns a reference to a filter.
290    pub fn filter(&self) -> &F {
291        &self.st().filter.filter::<Layer<F, T>>().0
292    }
293}
294
295impl<F: Filter> Io<F> {
296    #[inline]
297    /// Convert the current I/O stream into a sealed version.
298    pub fn seal(self) -> Io<Sealed> {
299        let state = self.take_io_ref();
300        state.0.filter.seal::<F>();
301
302        Io(UnsafeCell::new(state), marker::PhantomData)
303    }
304
305    #[inline]
306    /// Convert the current I/O stream into a boxed version.
307    pub fn boxed(self) -> IoBoxed {
308        self.seal().into()
309    }
310
311    #[inline]
312    /// Adds a new processing layer to the current filter chain.
313    pub fn add_filter<U>(self, nf: U) -> Io<Layer<U, F>>
314    where
315        U: FilterLayer,
316    {
317        // Write buffer processing may be delayed,
318        // call the filter chain to process pending writes
319        if let Err(e) = self.st().buffer.process_write_buf(&self) {
320            self.st().terminate_connection(Some(e));
321        }
322
323        let state = self.take_io_ref();
324
325        // Add the buffers layer.
326        //
327        // Safety: no references into the buffer storage are retained.
328        // All APIs first remove the buffer from storage before processing it.
329        unsafe { &mut *(Rc::as_ptr(&state.0).cast_mut()) }
330            .buffer
331            .add_layer(state.0.cfg.write_page_size());
332
333        // Replace current filter
334        state.0.filter.add_filter::<F, U>(nf);
335
336        let io = Io(UnsafeCell::new(state), marker::PhantomData);
337
338        // push read data into new filter
339        if let Err(e) = io.st().buffer.process_read_buf(&io, 0) {
340            io.st().terminate_connection(Some(e));
341        }
342        io
343    }
344
345    /// Wraps the current layer with a wrapper.
346    pub fn map_filter<U, R>(self, f: U) -> Io<R>
347    where
348        U: FnOnce(F) -> R,
349        R: Filter,
350    {
351        // Write buffer processing may be delayed,
352        // call the filter chain to process pending writes
353        if let Err(e) = self.st().buffer.process_write_buf(&self) {
354            self.st().terminate_connection(Some(e));
355        }
356
357        let state = self.take_io_ref();
358        state.0.filter.map_filter::<F, U, R>(f);
359
360        Io(UnsafeCell::new(state), marker::PhantomData)
361    }
362}
363
364impl<F> Io<F> {
365    /// Reads from the incoming I/O stream and decodes a codec item.
366    pub async fn recv<U>(
367        &self,
368        codec: &U,
369    ) -> Result<Option<U::Item>, Either<U::Error, io::Error>>
370    where
371        U: Decoder,
372    {
373        loop {
374            return match poll_fn(|cx| self.poll_recv(codec, cx)).await {
375                Ok(item) => Ok(Some(item)),
376                Err(RecvError::KeepAlive) => Err(Either::Right(io::Error::new(
377                    io::ErrorKind::TimedOut,
378                    "Timeout",
379                ))),
380                Err(RecvError::WriteBackpressure) => {
381                    poll_fn(|cx| self.poll_flush(cx, false))
382                        .await
383                        .map_err(Either::Right)?;
384                    continue;
385                }
386                Err(RecvError::Decoder(err)) => Err(Either::Left(err)),
387                Err(RecvError::PeerGone(Some(err))) => Err(Either::Right(err)),
388                Err(RecvError::PeerGone(None)) => Ok(None),
389            };
390        }
391    }
392
393    /// Reads bytes from this I/O stream into the specified buffer.
394    ///
395    /// If there is not enough data available, waits for incoming data.
396    pub async fn read(&self, dst: &mut [u8]) -> io::Result<()> {
397        loop {
398            let completed = self.with_read_buf(|buf| {
399                if buf.len() >= dst.len() {
400                    let _ = io::Read::read(buf, dst).expect("Cannot fail");
401                    true
402                } else {
403                    false
404                }
405            });
406            if completed {
407                return Ok(());
408            }
409            self.read_ready().await?;
410        }
411    }
412
413    #[inline]
414    /// Waits until the I/O stream is ready for reading.
415    pub async fn read_ready(&self) -> io::Result<Option<()>> {
416        poll_fn(|cx| self.poll_read_ready(cx)).await
417    }
418
419    #[inline]
420    /// Waits until the I/O stream receives new data.
421    pub async fn read_notify(&self) -> io::Result<Option<()>> {
422        poll_fn(|cx| self.poll_read_notify(cx)).await
423    }
424
425    #[inline]
426    /// Pauses the read task.
427    pub fn pause(&self) {
428        let st = self.st();
429        if !st.flags.is_read_paused() {
430            st.wake_read_task();
431            st.flags.set_read_paused();
432        }
433    }
434
435    #[inline]
436    /// Encodes an item and sends it to the peer, fully flushing the write buffer.
437    pub async fn send<U>(
438        &self,
439        item: U::Item,
440        codec: &U,
441    ) -> Result<(), Either<U::Error, io::Error>>
442    where
443        U: Encoder,
444    {
445        self.encode(item, codec).map_err(Either::Left)?;
446
447        poll_fn(|cx| self.poll_flush(cx, true))
448            .await
449            .map_err(Either::Right)?;
450
451        Ok(())
452    }
453
454    #[inline]
455    /// Wakes the write task and requests a flush of buffered data.
456    ///
457    /// This is the async version of `.poll_flush()` method.
458    pub async fn flush(&self, full: bool) -> io::Result<()> {
459        poll_fn(|cx| self.poll_flush(cx, full)).await
460    }
461
462    #[inline]
463    /// Gracefully shuts down the I/O stream.
464    pub async fn shutdown(&self) -> io::Result<()> {
465        poll_fn(|cx| self.poll_shutdown(cx)).await
466    }
467
468    #[inline]
469    /// Polls for read readiness.
470    ///
471    /// If the I/O stream is not currently ready for reading,
472    /// this method will store a clone of the `Waker` from the provided `Context`.
473    /// When the I/O stream becomes ready for reading, `wake()` will be called on the waker.
474    ///
475    /// # Returns
476    ///
477    /// The function returns:
478    ///
479    /// - `Poll::Pending` if the I/O stream is not ready for reading.
480    /// - `Poll::Ready(Ok(Some(())))` if the I/O stream is ready for reading.
481    /// - `Poll::Ready(Ok(None))` if the I/O stream is disconnected.
482    /// - `Poll::Ready(Err(e))` if an error is encountered.
483    pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Option<()>>> {
484        let st = self.st();
485
486        if st.flags.is_closed() {
487            Poll::Ready(Ok(None))
488        } else {
489            let ready = st.flags.is_read_ready();
490
491            // If the dispatcher requests more data but no read occurs,
492            // restart the read task.
493            if st.flags.is_read_paused_or_backpressure() {
494                st.flags.unset_all_read_flags();
495                st.flags.unset_read_paused();
496                st.wake_read_task();
497                if ready {
498                    Poll::Ready(Ok(Some(())))
499                } else {
500                    st.dispatch_task.register(cx.waker());
501                    Poll::Pending
502                }
503            } else if ready {
504                Poll::Ready(Ok(Some(())))
505            } else {
506                if st.flags.is_read_paused() {
507                    st.wake_read_task();
508                    st.flags.unset_read_paused();
509                }
510                st.dispatch_task.register(cx.waker());
511                Poll::Pending
512            }
513        }
514    }
515
516    #[inline]
517    /// Polls the I/O stream for availability of incoming data.
518    pub fn poll_read_notify(&self, cx: &mut Context<'_>) -> Poll<io::Result<Option<()>>> {
519        let st = self.st();
520        if st.flags.is_stopping() {
521            Poll::Ready(Ok(None))
522        } else if st.flags.check_read_notifed() {
523            Poll::Ready(Ok(Some(())))
524        } else {
525            st.flags.set_read_notify();
526            if self.poll_read_ready(cx).is_ready() {
527                st.dispatch_task.register(cx.waker());
528            }
529            st.dispatch_task.register(cx.waker());
530            Poll::Pending
531        }
532    }
533
534    #[inline]
535    /// Decode codec item from incoming bytes stream.
536    ///
537    /// Wake read task and request to read more data if data is not enough for decoding.
538    /// If error get returned this method does not register waker for later wake up action.
539    pub fn poll_recv<U>(
540        &self,
541        codec: &U,
542        cx: &mut Context<'_>,
543    ) -> Poll<Result<U::Item, RecvError<U>>>
544    where
545        U: Decoder,
546    {
547        let decoded = self.poll_recv_decode(codec, cx)?;
548
549        if let Some(item) = decoded.item {
550            Poll::Ready(Ok(item))
551        } else {
552            Poll::Pending
553        }
554    }
555
556    #[inline]
557    /// Decode codec item from incoming bytes stream.
558    ///
559    /// Wake read task and request to read more data if data is not enough for decoding.
560    /// If error get returned this method does not register waker for later wake up action.
561    pub fn poll_recv_decode<U>(
562        &self,
563        codec: &U,
564        cx: &mut Context<'_>,
565    ) -> Result<Decoded<U::Item>, RecvError<U>>
566    where
567        U: Decoder,
568    {
569        let st = self.st();
570        st.flags.unset_read_ready();
571
572        let decoded = self
573            .decode_item(codec)
574            .map_err(|err| RecvError::Decoder(err))?;
575
576        if decoded.item.is_some() {
577            Ok(decoded)
578        } else if st.flags.is_stopping() {
579            Err(RecvError::PeerGone(st.error()))
580        } else if st.flags.check_dispatcher_timeout() {
581            Err(RecvError::KeepAlive)
582        } else if st.flags.is_wr_backpressure() {
583            Err(RecvError::WriteBackpressure)
584        } else {
585            match self.poll_read_ready(cx) {
586                Poll::Pending | Poll::Ready(Ok(Some(()))) => {
587                    #[cfg(feature = "trace")]
588                    if decoded.remains != 0 {
589                        log::trace!("{}: Not enough data to decode next frame", self.tag());
590                    }
591                    Ok(decoded)
592                }
593                Poll::Ready(Err(e)) => Err(RecvError::PeerGone(Some(e))),
594                Poll::Ready(Ok(None)) => Err(RecvError::PeerGone(None)),
595            }
596        }
597    }
598
599    #[inline]
600    /// Wakes the write task and instructs it to flush data.
601    ///
602    /// If `full` is true, wakes the dispatcher when all data has been flushed;
603    /// otherwise, it wakes when the write buffer size falls below the low-watermark size.
604    pub fn poll_flush(&self, cx: &mut Context<'_>, full: bool) -> Poll<io::Result<()>> {
605        let st = self.st();
606
607        // flush filter state
608        st.buffer.process_write_buf_force(self)?;
609        self.consolidate_write_state(false);
610
611        let len = st.buffer.write_buf_size();
612        if len > 0 {
613            if st.flags.is_closed() {
614                return Poll::Ready(Err(st.error_or_disconnected()));
615            } else if full {
616                st.flags.set_wants_write_flush();
617                st.dispatch_task.register(cx.waker());
618                return Poll::Pending;
619            } else if st.is_wr_backpressure_needed(len) {
620                st.flags.set_wr_backpressure();
621                st.dispatch_task.register(cx.waker());
622                return Poll::Pending;
623            }
624        }
625        if st.flags.is_closed() && !st.flags.is_write_flush() {
626            Poll::Ready(Err(st.error_or_disconnected()))
627        } else {
628            st.flags.unset_wr_backpressure_and_flush();
629            Poll::Ready(Ok(()))
630        }
631    }
632
633    #[inline]
634    /// Gracefully shuts down the I/O stream.
635    pub fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
636        let st = self.st();
637
638        if st.flags.is_stopping() {
639            if let Some(err) = st.error() {
640                Poll::Ready(Err(err))
641            } else {
642                Poll::Ready(Ok(()))
643            }
644        } else {
645            if !st.flags.is_stopping_filters() {
646                st.start_shutdown();
647            }
648            st.flags.unset_all_read_flags();
649            st.flags.unset_read_paused();
650
651            st.wake_read_task();
652            st.dispatch_task.register(cx.waker());
653            Poll::Pending
654        }
655    }
656
657    #[inline]
658    /// Pauses the read task.
659    ///
660    /// Returns status updates.
661    pub fn poll_read_pause(&self, cx: &mut Context<'_>) -> Poll<IoStatusUpdate> {
662        self.pause();
663        self.poll_status_update(cx)
664    }
665
666    #[inline]
667    /// Polls for available status updates.
668    pub fn poll_status_update(&self, cx: &mut Context<'_>) -> Poll<IoStatusUpdate> {
669        let st = self.st();
670        st.dispatch_task.register(cx.waker());
671        if st.flags.is_closed() {
672            Poll::Ready(IoStatusUpdate::PeerGone(st.error()))
673        } else if st.flags.check_dispatcher_timeout() {
674            Poll::Ready(IoStatusUpdate::KeepAlive)
675        } else if st.flags.is_wr_backpressure() {
676            // write backpressure is enabled and write buf smaller than half
677            if st.should_disable_wr_backpressure(st.buffer.write_buf_size()) {
678                st.flags.unset_wr_backpressure();
679            }
680            Poll::Ready(IoStatusUpdate::WriteBackpressure)
681        } else {
682            Poll::Pending
683        }
684    }
685
686    #[inline]
687    /// Registers a dispatch task.
688    pub fn poll_dispatch(&self, cx: &mut Context<'_>) {
689        self.st().dispatch_task.register(cx.waker());
690    }
691}
692
693impl<F> AsRef<IoRef> for Io<F> {
694    #[inline]
695    fn as_ref(&self) -> &IoRef {
696        self.io_ref()
697    }
698}
699
700impl<F> Eq for Io<F> {}
701
702impl<F> PartialEq for Io<F> {
703    #[inline]
704    fn eq(&self, other: &Self) -> bool {
705        self.io_ref().eq(other.io_ref())
706    }
707}
708
709impl<F> hash::Hash for Io<F> {
710    #[inline]
711    fn hash<H: hash::Hasher>(&self, state: &mut H) {
712        self.io_ref().hash(state);
713    }
714}
715
716impl<F> fmt::Debug for Io<F> {
717    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
718        f.debug_struct("Io").field("state", self.st()).finish()
719    }
720}
721
722impl<F> ops::Deref for Io<F> {
723    type Target = IoRef;
724
725    #[inline]
726    fn deref(&self) -> &Self::Target {
727        self.io_ref()
728    }
729}
730
731impl<F> Drop for Io<F> {
732    fn drop(&mut self) {
733        let st = self.st();
734        self.stop_timer();
735
736        if st.filter.is_set() {
737            // filter is unsafe and must be dropped explicitly,
738            // and won't be dropped without special attention
739            if !st.flags.is_terminated() {
740                log::trace!("{}: Io is dropped, terminate connection", st.tag());
741            }
742
743            st.terminate_connection(None);
744            st.filter.drop_filter::<F>();
745        }
746
747        IoManager::unregister(self.io_ref());
748    }
749}
750
751#[derive(Debug)]
752/// The `OnDisconnect` future resolves when the I/O stream is disconnected.
753#[must_use = "OnDisconnect do nothing unless polled"]
754pub struct OnDisconnect {
755    token: usize,
756    inner: Rc<IoState>,
757}
758
759impl OnDisconnect {
760    pub(super) fn new(inner: Rc<IoState>) -> Self {
761        Self::new_inner(inner.flags.is_stopping(), inner)
762    }
763
764    fn new_inner(disconnected: bool, inner: Rc<IoState>) -> Self {
765        let token = if disconnected {
766            usize::MAX
767        } else {
768            let mut on_disconnect = inner.on_disconnect.take();
769            let token = if let Some(ref mut on_disconnect) = on_disconnect {
770                let token = on_disconnect.len();
771                on_disconnect.push(LocalWaker::default());
772                token
773            } else {
774                on_disconnect = Some(Box::new(vec![LocalWaker::default()]));
775                0
776            };
777            inner.on_disconnect.set(on_disconnect);
778            token
779        };
780        Self { token, inner }
781    }
782
783    #[inline]
784    /// Checks if the I/O stream is disconnected.
785    pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<()> {
786        if self.token == usize::MAX || self.inner.flags.is_stopping() {
787            Poll::Ready(())
788        } else if let Some(on_disconnect) = self.inner.on_disconnect.take() {
789            on_disconnect[self.token].register(cx.waker());
790            self.inner.on_disconnect.set(Some(on_disconnect));
791            Poll::Pending
792        } else {
793            Poll::Ready(())
794        }
795    }
796}
797
798impl Clone for OnDisconnect {
799    fn clone(&self) -> Self {
800        if self.token == usize::MAX {
801            OnDisconnect::new_inner(true, self.inner.clone())
802        } else {
803            OnDisconnect::new_inner(false, self.inner.clone())
804        }
805    }
806}
807
808impl Future for OnDisconnect {
809    type Output = ();
810
811    #[inline]
812    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
813        self.poll_ready(cx)
814    }
815}
816
817#[cfg(test)]
818mod tests {
819    use ntex_bytes::{BufMut, BytePages, Bytes, BytesMut};
820    use ntex_codec::BytesCodec;
821    use ntex_util::{future::lazy, time::Millis, time::sleep};
822
823    use super::*;
824    use crate::{
825        FilterBuf, IoContext, IoTaskStatus, Readiness, ops::Iops, testing::IoTest,
826    };
827
828    const BIN: &[u8] = b"GET /test HTTP/1\r\n\r\n";
829    const TEXT: &str = "GET /test HTTP/1\r\n\r\n";
830    const BIN2: &[u8] = b"12345678901234561234567890123456";
831
832    #[ntex::test]
833    async fn test_basics() {
834        let (client, server) = IoTest::create();
835        client.remote_buffer_cap(1024);
836
837        let server = Io::from(server);
838        assert!(server.eq(&server));
839        assert!(server.io_ref().eq(server.io_ref()));
840    }
841
842    #[ntex::test]
843    async fn test_recv() {
844        let (client, server) = IoTest::create();
845        client.remote_buffer_cap(1024);
846
847        let server = Io::new(server, SharedCfg::new("SRV"));
848
849        server.st().notify_timeout();
850        let err = server.recv(&BytesCodec).await.err().unwrap();
851        assert!(format!("{err:?}").contains("Timeout"));
852
853        client.write(TEXT);
854        server.st().flags.set_wr_backpressure();
855        let item = server.recv(&BytesCodec).await.ok().unwrap().unwrap();
856        assert_eq!(item, TEXT);
857    }
858
859    #[ntex::test]
860    async fn test_send() {
861        let (client, server) = IoTest::create();
862        client.remote_buffer_cap(1024);
863
864        let server = Io::from(server);
865        assert!(server.eq(&server));
866
867        server
868            .send(Bytes::from_static(BIN), &BytesCodec)
869            .await
870            .ok()
871            .unwrap();
872        let item = client.read_any();
873        assert_eq!(item, TEXT);
874    }
875
876    #[ntex::test]
877    async fn read() {
878        let io = Io::new(
879            IoTest::create().0,
880            SharedCfg::new("SRV").add(IoConfig::default().set_read_buf(8, 4, 16)),
881        );
882        assert!(lazy(|cx| io.poll_read_ready(cx)).await.is_pending());
883        assert!(io.st().dispatch_task.is_set());
884
885        let ctx = IoContext::new(io.get_ref());
886
887        // Ready
888        assert_eq!(
889            lazy(|cx| ctx.poll_read_ready(cx)).await,
890            Poll::Ready(Readiness::Ready)
891        );
892        assert!(io.st().read_task.is_set());
893        assert!(!io.st().flags.is_read_ready());
894        assert!(!io.st().flags.is_rd_backpressure());
895
896        // == Enable backpressure
897        ctx.update_read_status(BytesMut::copy_from_slice(b"1234567890"), Ok(10));
898
899        // dispatcher is woken
900        assert!(!io.st().dispatch_task.is_set());
901        // read task is paused
902        assert!(io.st().flags.is_read_paused());
903        // read buffer is ready
904        assert!(io.st().flags.is_read_ready());
905        // read backpressure is enabled
906        assert!(io.st().flags.is_rd_backpressure());
907        // read task paused
908        assert_eq!(lazy(|cx| ctx.poll_read_ready(cx)).await, Poll::Pending);
909
910        // read one byte
911        assert_eq!(io.with_read_buf(|buf| buf.split_to(1)), b"1");
912        // read buffer is ready
913        assert!(io.st().flags.is_read_ready());
914        // read backpressure is enabled
915        assert!(io.st().flags.is_rd_backpressure());
916
917        // read task is set
918        assert!(io.st().read_task.is_set());
919
920        // read one more byte
921        assert_eq!(io.with_read_buf(|buf| buf.split_to(1)), b"2");
922        // read backpressure is enabled
923        assert!(io.st().flags.is_rd_backpressure());
924
925        // read 4 bytes. buf size is 4, less that half of high watermark
926        assert_eq!(io.with_read_buf(|buf| buf.split_to(4)), b"3456");
927        // read task is not paused anymore
928        assert!(!io.st().flags.is_read_paused());
929        // read buffer is not ready
930        assert!(!io.st().flags.is_read_ready());
931        // read backpressure is disabled
932        assert!(!io.st().flags.is_rd_backpressure());
933        // read task is woken
934        assert!(!io.st().read_task.is_set());
935        assert_eq!(
936            lazy(|cx| ctx.poll_read_ready(cx)).await,
937            Poll::Ready(Readiness::Ready)
938        );
939
940        // register dispatcher task
941        lazy(|cx| io.poll_dispatch(cx)).await;
942
943        // == Enable backpressure, 4 bytes in buffer + 4 more
944        ctx.update_read_status(BytesMut::copy_from_slice(b"1234"), Ok(4));
945
946        // dispatcher is woken
947        assert!(!io.st().dispatch_task.is_set());
948        // read task is paused
949        assert!(io.st().flags.is_read_paused());
950        // read buffer is ready
951        assert!(io.st().flags.is_read_ready());
952        // read backpressure is enabled
953        assert!(io.st().flags.is_rd_backpressure());
954        // read task paused
955        assert_eq!(lazy(|cx| ctx.poll_read_ready(cx)).await, Poll::Pending);
956
957        // read 4 bytes. buf size is 4, less that half of high watermark
958        assert_eq!(io.with_read_buf(|buf| buf.split_to(4)), b"7890");
959        // read backpressure is disabled
960        assert!(!io.st().flags.is_rd_backpressure());
961
962        // register dispatcher task
963        lazy(|cx| io.poll_dispatch(cx)).await;
964
965        // == No backpressure, 4 bytes in buffer + 3 more
966        ctx.update_read_status(BytesMut::copy_from_slice(b"567"), Ok(3));
967
968        // read task is paused
969        assert!(!io.st().flags.is_read_paused());
970        // read buffer is ready
971        assert!(io.st().flags.is_read_ready());
972        // read backpressure is enabled
973        assert!(!io.st().flags.is_rd_backpressure());
974        // read task ready
975        assert_eq!(
976            lazy(|cx| ctx.poll_read_ready(cx)).await,
977            Poll::Ready(Readiness::Ready)
978        );
979
980        // read 4 bytes. buf size is 4, less that half of high watermark
981        assert_eq!(io.with_read_buf(BytesMut::take), b"1234567");
982        // read task is paused
983        assert!(!io.st().flags.is_read_paused());
984        // read buffer is ready
985        assert!(!io.st().flags.is_read_ready());
986        // read task is not woken
987        assert!(io.st().read_task.is_set());
988
989        // == Terminate
990        io.terminate();
991        // read task is woken
992        assert!(!io.st().read_task.is_set());
993        // read task ready
994        assert_eq!(
995            lazy(|cx| ctx.poll_read_ready(cx)).await,
996            Poll::Ready(Readiness::Terminate)
997        );
998    }
999
1000    #[ntex::test]
1001    async fn read_notify() {
1002        let io = Io::new(
1003            IoTest::create().0,
1004            SharedCfg::new("SRV").add(IoConfig::default().set_read_buf(8, 4, 16)),
1005        );
1006        assert!(!io.st().flags.is_read_notify());
1007        assert!(lazy(|cx| io.poll_read_notify(cx)).await.is_pending());
1008        assert!(io.st().dispatch_task.is_set());
1009        assert!(io.st().flags.is_read_notify());
1010
1011        let ctx = IoContext::new(io.get_ref());
1012
1013        // incoming bytes
1014        ctx.update_read_status(BytesMut::copy_from_slice(b"1"), Ok(1));
1015
1016        assert!(!io.st().dispatch_task.is_set());
1017        // rd buffer is ready
1018        assert!(io.st().flags.is_read_ready());
1019        assert!(io.st().flags.is_read_notify());
1020        // dispatcher is notified
1021        assert!(io.st().flags.is_read_notified());
1022        let res = lazy(|cx| io.poll_read_notify(cx)).await;
1023        assert!(matches!(res, Poll::Ready(Ok(Some(())))));
1024
1025        // disapcher is not set
1026        assert!(!io.st().dispatch_task.is_set());
1027        // rd buffer is ready
1028        assert!(io.st().flags.is_read_ready());
1029
1030        // == start notification process again
1031        assert!(lazy(|cx| io.poll_read_notify(cx)).await.is_pending());
1032        assert!(io.st().dispatch_task.is_set());
1033        assert!(io.st().flags.is_read_notify());
1034        assert!(io.st().flags.is_read_ready());
1035        // read task ready
1036        assert_eq!(
1037            lazy(|cx| ctx.poll_read_ready(cx)).await,
1038            Poll::Ready(Readiness::Ready)
1039        );
1040
1041        // == enable packpressure
1042        ctx.update_read_status(BytesMut::copy_from_slice(b"2345678"), Ok(7));
1043        // read backpressure is enabled
1044        assert!(io.st().flags.is_rd_backpressure());
1045
1046        // rd buffer is ready
1047        assert!(io.st().flags.is_read_ready());
1048        assert!(io.st().flags.is_read_notify());
1049        // dispatcher is notified
1050        assert!(io.st().flags.is_read_notified());
1051        let res = lazy(|cx| io.poll_read_notify(cx)).await;
1052        assert!(matches!(res, Poll::Ready(Ok(Some(())))));
1053        // read task paused
1054        assert_eq!(lazy(|cx| ctx.poll_read_ready(cx)).await, Poll::Pending);
1055        // read task is set
1056        assert!(io.st().read_task.is_set());
1057
1058        // == start notification process again
1059        assert!(lazy(|cx| io.poll_read_notify(cx)).await.is_pending());
1060        // read flags active
1061        assert!(!io.st().flags.is_rd_backpressure());
1062        assert!(!io.st().flags.is_read_ready());
1063        assert!(!io.st().flags.is_read_paused());
1064        // read task is woken
1065        assert!(!io.st().read_task.is_set());
1066        // read task ready
1067        assert_eq!(
1068            lazy(|cx| ctx.poll_read_ready(cx)).await,
1069            Poll::Ready(Readiness::Ready)
1070        );
1071
1072        // incoming bytes
1073        ctx.update_read_status(BytesMut::copy_from_slice(b"1"), Ok(1));
1074        assert!(!io.st().dispatch_task.is_set());
1075        // rd buffer is ready
1076        assert!(io.st().flags.is_read_ready());
1077        assert!(io.st().flags.is_read_notify());
1078        assert!(io.st().flags.is_read_paused());
1079        assert!(io.st().flags.is_rd_backpressure());
1080        // dispatcher is notified
1081        assert!(io.st().flags.is_read_notified());
1082        assert!(matches!(
1083            lazy(|cx| io.poll_read_notify(cx)).await,
1084            Poll::Ready(Ok(Some(())))
1085        ));
1086
1087        // == Terminate
1088        io.terminate();
1089        let res = lazy(|cx| io.poll_read_notify(cx)).await;
1090        assert!(matches!(res, Poll::Ready(Ok(None))), "{res:?}");
1091    }
1092
1093    #[ntex::test]
1094    async fn read_readiness() {
1095        let (client, server) = IoTest::create();
1096        client.remote_buffer_cap(1024);
1097
1098        let io = Io::from(server);
1099        assert!(lazy(|cx| io.poll_read_ready(cx)).await.is_pending());
1100
1101        client.write(TEXT);
1102        assert_eq!(io.read_ready().await.unwrap(), Some(()));
1103        assert!(matches!(
1104            lazy(|cx| io.poll_read_ready(cx)).await,
1105            Poll::Ready(Ok(Some(())))
1106        ));
1107
1108        let item = io.with_read_buf(BytesMut::take);
1109        assert_eq!(item, Bytes::from_static(BIN));
1110
1111        client.write(TEXT);
1112        sleep(Millis(50)).await;
1113        assert!(lazy(|cx| io.poll_read_ready(cx)).await.is_ready());
1114        assert!(lazy(|cx| io.poll_read_ready(cx)).await.is_ready());
1115    }
1116
1117    #[ntex::test]
1118    async fn read_backpressure() {
1119        let (client, server) = IoTest::create();
1120
1121        let io = Io::new(
1122            server,
1123            SharedCfg::new("SRV").add(IoConfig::default().set_read_buf(64, 32, 12)),
1124        );
1125        assert!(lazy(|cx| io.poll_read_ready(cx)).await.is_pending());
1126
1127        client.write(BIN2);
1128        client.write(BIN2);
1129        sleep(Millis(50)).await;
1130        assert!(io.flags().is_read_ready());
1131        assert!(io.flags().is_rd_backpressure());
1132        let _item = io.recv(&BytesCodec).await.ok().unwrap().unwrap();
1133        assert!(!io.flags().is_read_ready());
1134        assert!(!io.flags().is_rd_backpressure());
1135
1136        client.write(BIN2);
1137        client.write(BIN2);
1138        sleep(Millis(50)).await;
1139        assert!(io.flags().is_read_ready());
1140        assert!(io.flags().is_rd_backpressure());
1141        assert_eq!(io.read_ready().await.unwrap(), Some(()));
1142    }
1143
1144    #[ntex::test]
1145    async fn write() {
1146        let io = Io::new(
1147            IoTest::create().0,
1148            SharedCfg::new("SRV").add(IoConfig::default().set_write_buf(8, 4, 16)),
1149        );
1150        assert!(lazy(|cx| io.poll_status_update(cx)).await.is_pending());
1151        assert!(io.st().dispatch_task.is_set());
1152        assert!(io.st().flags.is_direct_wr_enabled());
1153
1154        let ctx = IoContext::new(io.get_ref());
1155
1156        // == No write work
1157        assert_eq!(lazy(|cx| ctx.poll_write_ready(cx)).await, Poll::Pending);
1158        assert!(io.st().write_task.is_set());
1159        assert!(io.st().flags.is_write_paused());
1160        assert!(!io.st().flags.is_wr_backpressure());
1161
1162        // write
1163        io.with_write_buf(|buf| buf.put_slice(b"1234")).unwrap();
1164        assert_eq!(lazy(|cx| ctx.poll_write_ready(cx)).await, Poll::Pending);
1165        // write task is paused
1166        assert!(io.st().flags.is_write_paused());
1167        // send-buf op is scheduled
1168        assert!(io.st().flags.is_wr_send_scheduled());
1169        // back-pressure is not enabled
1170        assert!(!io.st().flags.is_wr_backpressure());
1171        // dispatch is not woken up
1172        assert!(io.st().dispatch_task.is_set());
1173
1174        // == enable wr backpressure
1175        io.with_write_buf(|buf| buf.put_slice(b"5678")).unwrap();
1176        // back-pressure is enabled
1177        assert!(io.st().flags.is_wr_backpressure());
1178        // dispatch is woken up
1179        assert!(!io.st().dispatch_task.is_set());
1180        // write task is set
1181        assert!(io.st().write_task.is_set());
1182        // dispatcher gets WriteBackpressure
1183        assert!(matches!(
1184            lazy(|cx| io.poll_status_update(cx)).await,
1185            Poll::Ready(IoStatusUpdate::WriteBackpressure)
1186        ));
1187        // flush write buffer
1188        assert!(lazy(|cx| io.poll_flush(cx, false)).await.is_pending());
1189        // full flush is not enabled
1190        assert!(!io.st().flags.is_write_flush());
1191
1192        // run send-buf ops
1193        Iops::run();
1194        // send-buf op is not scheduled
1195        assert!(!io.st().flags.is_wr_send_scheduled());
1196        // write task is not paused
1197        assert!(!io.st().flags.is_write_paused());
1198        // write task has been woken up
1199        assert!(!io.st().write_task.is_set());
1200        // write task can proceed
1201        assert_eq!(
1202            lazy(|cx| ctx.poll_write_ready(cx)).await,
1203            Poll::Ready(Readiness::Ready)
1204        );
1205
1206        // wrote 4 bytes to io
1207        assert_eq!(ctx.with_write_buf(|buf| buf.split_to(4).freeze()), b"1234");
1208        // continue to write
1209        assert_eq!(ctx.update_write_status(Ok(true)), IoTaskStatus::Io);
1210        // write task can proceed
1211        assert_eq!(
1212            lazy(|cx| ctx.poll_write_ready(cx)).await,
1213            Poll::Ready(Readiness::Ready)
1214        );
1215        // write task is not paused
1216        assert!(!io.st().flags.is_write_paused());
1217        // back-pressure is enabled
1218        assert!(io.st().flags.is_wr_backpressure());
1219        // dispatcher gets WriteBackpressure, buf wr-backpressure flags is removed
1220        assert!(matches!(
1221            lazy(|cx| io.poll_status_update(cx)).await,
1222            Poll::Ready(IoStatusUpdate::WriteBackpressure)
1223        ));
1224        // back-pressure is disabled
1225        assert!(!io.st().flags.is_wr_backpressure());
1226        assert!(lazy(|cx| io.poll_status_update(cx)).await.is_pending());
1227        // write buffer is flushed
1228        assert!(matches!(
1229            lazy(|cx| io.poll_flush(cx, false)).await,
1230            Poll::Ready(Ok(()))
1231        ));
1232
1233        // full flush write buffer
1234        io.with_write_buf(|buf| buf.put_slice(b"1234")).unwrap();
1235        assert!(lazy(|cx| io.poll_flush(cx, true)).await.is_pending());
1236        // full flush is enabled
1237        assert!(io.st().flags.is_write_flush());
1238        // back-pressure is enabled
1239        assert!(io.st().flags.is_wr_backpressure());
1240
1241        // wrote all data
1242        Iops::run();
1243        assert_eq!(ctx.with_write_buf(BytePages::freeze), b"56781234");
1244        // write task is not paused, so send-buf op is not scheduled
1245        assert!(!io.st().flags.is_wr_send_scheduled());
1246        // update status, no more work
1247        assert_eq!(ctx.update_write_status(Ok(true)), IoTaskStatus::Pause);
1248        // write task is paused
1249        assert!(io.st().flags.is_write_paused());
1250        // flush is still enabled
1251        assert!(io.st().flags.is_write_flush());
1252        // back-pressure is still enabled
1253        assert!(io.st().flags.is_wr_backpressure());
1254        // dispatch is woken up
1255        assert!(!io.st().dispatch_task.is_set());
1256
1257        // write buffer is flushed
1258        assert!(matches!(
1259            lazy(|cx| io.poll_flush(cx, false)).await,
1260            Poll::Ready(Ok(()))
1261        ));
1262        // full flush is disabled
1263        assert!(!io.st().flags.is_write_flush());
1264        // back-pressure is disabled
1265        assert!(!io.st().flags.is_wr_backpressure());
1266
1267        // == Terminate
1268        io.terminate();
1269        // read task is woken
1270        assert!(!io.st().write_task.is_set());
1271        // write task ready
1272        assert_eq!(
1273            lazy(|cx| ctx.poll_write_ready(cx)).await,
1274            Poll::Ready(Readiness::Terminate)
1275        );
1276        // flush returns error
1277        let Poll::Ready(Err(err)) = lazy(|cx| io.poll_flush(cx, false)).await else {
1278            panic!()
1279        };
1280        assert_eq!(err.kind(), io::ErrorKind::NotConnected);
1281        // statis returns error
1282        assert!(matches!(
1283            lazy(|cx| io.poll_status_update(cx)).await,
1284            Poll::Ready(IoStatusUpdate::PeerGone(None))
1285        ));
1286    }
1287
1288    #[ntex::test]
1289    async fn write_backpressure() {
1290        let (client, server) = IoTest::create();
1291        client.remote_buffer_cap(0);
1292
1293        let io = Io::new(
1294            server,
1295            SharedCfg::new("SRV").add(IoConfig::default().set_write_buf(16, 8, 12)),
1296        );
1297        assert!(lazy(|cx| io.poll_read_ready(cx)).await.is_pending());
1298        assert!(io.flags().is_write_paused());
1299        assert!(!io.flags().is_wr_backpressure());
1300        assert!(!io.is_wr_backpressure());
1301
1302        io.encode_slice(BIN2).unwrap();
1303        assert!(Iops::is_registered(&io));
1304        assert!(io.flags().is_wr_backpressure());
1305
1306        client.remote_buffer_cap(1024);
1307        let item = client.read().await.unwrap();
1308        assert_eq!(item, BIN2);
1309        assert!(io.flags().is_wr_backpressure());
1310        assert!(matches!(
1311            lazy(|cx| io.poll_status_update(cx)).await,
1312            Poll::Ready(IoStatusUpdate::WriteBackpressure)
1313        ));
1314        assert!(!io.flags().is_wr_backpressure());
1315        assert!(matches!(
1316            lazy(|cx| io.poll_flush(cx, false)).await,
1317            Poll::Ready(Ok(()))
1318        ));
1319        assert!(!io.flags().is_wr_backpressure());
1320    }
1321
1322    #[ntex::test]
1323    async fn shutdown() {
1324        // layer drops all unprocessed data after filter shutdown
1325        #[derive(Debug)]
1326        struct F;
1327
1328        impl FilterLayer for F {
1329            fn process_read_buf(&self, _: &FilterBuf<'_>) -> io::Result<()> {
1330                Ok(())
1331            }
1332            fn process_write_buf(&self, _: &FilterBuf<'_>) -> io::Result<()> {
1333                Ok(())
1334            }
1335        }
1336
1337        let io = Io::new(
1338            IoTest::create().0,
1339            SharedCfg::new("SRV").add(IoConfig::default().set_write_buf(8, 4, 16)),
1340        );
1341        let st = io.st();
1342        assert!(lazy(|cx| io.poll_status_update(cx)).await.is_pending());
1343        assert!(st.dispatch_task.is_set());
1344        assert!(!st.flags.is_closed());
1345        assert!(!st.flags.is_stopping_filters());
1346
1347        let ctx = IoContext::new(io.get_ref());
1348
1349        // == init shutdown
1350        io.close();
1351        assert!(!st.flags.is_closed());
1352        assert!(st.flags.is_stopping_filters());
1353        // encoding is not allowed in shutting down stage
1354        let err = io.with_write_buf(|_| 1).unwrap_err();
1355        assert_eq!(err.kind(), io::ErrorKind::Other);
1356
1357        let io = io.add_filter(F);
1358        let layer = Layer::new(F, Base::new(io.get_ref()));
1359
1360        let st = io.st();
1361        st.buffer.with_write_src(|p| p.put_slice(b"123"));
1362        assert_eq!(st.buffer.write_buf_size(), 3);
1363        let res = st.buffer.with_filter(io.as_ref(), |f| layer.shutdown(f));
1364        assert!(matches!(res, Ok(Poll::Ready(()))));
1365        assert_eq!(st.buffer.write_buf_size(), 0);
1366
1367        // == terminate
1368        ctx.stop(None);
1369        assert!(st.flags.is_closed());
1370        assert!(st.flags.is_terminated());
1371        assert!(st.flags.is_stopping_filters());
1372
1373        let err = io.with_write_buf(|_| 1).unwrap_err();
1374        assert_eq!(err.kind(), io::ErrorKind::NotConnected);
1375    }
1376}