ntex_io/
tasks.rs

1use std::{cell::Cell, fmt, future::poll_fn, io, task::ready, task::Context, task::Poll};
2
3use ntex_bytes::{Buf, BufMut, BytesVec};
4use ntex_util::{future::lazy, future::select, future::Either, time::sleep, time::Sleep};
5
6use crate::{AsyncRead, AsyncWrite, Flags, IoRef, ReadStatus, WriteStatus};
7
8/// Context for io read task
9pub struct ReadContext(IoRef, Cell<Option<Sleep>>);
10
11impl fmt::Debug for ReadContext {
12    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
13        f.debug_struct("ReadContext").field("io", &self.0).finish()
14    }
15}
16
17impl ReadContext {
18    pub(crate) fn new(io: &IoRef) -> Self {
19        Self(io.clone(), Cell::new(None))
20    }
21
22    #[doc(hidden)]
23    #[inline]
24    /// Io tag
25    pub fn context(&self) -> IoContext {
26        IoContext::new(&self.0)
27    }
28
29    #[inline]
30    /// Io tag
31    pub fn tag(&self) -> &'static str {
32        self.0.tag()
33    }
34
35    /// Wait when io get closed or preparing for close
36    async fn wait_for_close(&self) {
37        poll_fn(|cx| {
38            let flags = self.0.flags();
39
40            if flags.intersects(Flags::IO_STOPPING | Flags::IO_STOPPED) {
41                Poll::Ready(())
42            } else {
43                self.0 .0.read_task.register(cx.waker());
44                if flags.contains(Flags::IO_STOPPING_FILTERS) {
45                    self.shutdown_filters(cx);
46                }
47                Poll::Pending
48            }
49        })
50        .await
51    }
52
53    /// Handle read io operations
54    pub async fn handle<T>(&self, io: &mut T)
55    where
56        T: AsyncRead,
57    {
58        let inner = &self.0 .0;
59
60        loop {
61            let result = poll_fn(|cx| self.0.filter().poll_read_ready(cx)).await;
62            if result == ReadStatus::Terminate {
63                log::trace!("{}: Read task is instructed to shutdown", self.tag());
64                break;
65            }
66
67            let mut buf = if inner.flags.get().is_read_buf_ready() {
68                // read buffer is still not read by dispatcher
69                // we cannot touch it
70                inner.pool.get().get_read_buf()
71            } else {
72                inner
73                    .buffer
74                    .get_read_source()
75                    .unwrap_or_else(|| inner.pool.get().get_read_buf())
76            };
77
78            // make sure we've got room
79            let (hw, lw) = self.0.memory_pool().read_params().unpack();
80            let remaining = buf.remaining_mut();
81            if remaining <= lw {
82                buf.reserve(hw - remaining);
83            }
84            let total = buf.len();
85
86            // call provided callback
87            let (buf, result) = match select(io.read(buf), self.wait_for_close()).await {
88                Either::Left(res) => res,
89                Either::Right(_) => {
90                    log::trace!("{}: Read io is closed, stop read task", self.tag());
91                    break;
92                }
93            };
94
95            // handle incoming data
96            let total2 = buf.len();
97            let nbytes = total2.saturating_sub(total);
98            let total = total2;
99
100            if let Some(mut first_buf) = inner.buffer.get_read_source() {
101                first_buf.extend_from_slice(&buf);
102                inner.buffer.set_read_source(&self.0, first_buf);
103            } else {
104                inner.buffer.set_read_source(&self.0, buf);
105            }
106
107            // handle buffer changes
108            if nbytes > 0 {
109                let filter = self.0.filter();
110                let res = match filter.process_read_buf(&self.0, &inner.buffer, 0, nbytes) {
111                    Ok(status) => {
112                        if status.nbytes > 0 {
113                            // check read back-pressure
114                            if hw < inner.buffer.read_destination_size() {
115                                log::trace!(
116                                "{}: Io read buffer is too large {}, enable read back-pressure",
117                                self.0.tag(),
118                                total
119                            );
120                                inner.insert_flags(Flags::BUF_R_READY | Flags::BUF_R_FULL);
121                            } else {
122                                inner.insert_flags(Flags::BUF_R_READY);
123                            }
124                            log::trace!(
125                                "{}: New {} bytes available, wakeup dispatcher",
126                                self.0.tag(),
127                                nbytes
128                            );
129                            // dest buffer has new data, wake up dispatcher
130                            inner.dispatch_task.wake();
131                        } else if inner.flags.get().is_waiting_for_read() {
132                            // in case of "notify" we must wake up dispatch task
133                            // if we read any data from source
134                            inner.dispatch_task.wake();
135                        }
136
137                        // while reading, filter wrote some data
138                        // in that case filters need to process write buffers
139                        // and potentialy wake write task
140                        if status.need_write {
141                            filter.process_write_buf(&self.0, &inner.buffer, 0)
142                        } else {
143                            Ok(())
144                        }
145                    }
146                    Err(err) => Err(err),
147                };
148
149                if let Err(err) = res {
150                    inner.dispatch_task.wake();
151                    inner.io_stopped(Some(err));
152                    inner.insert_flags(Flags::BUF_R_READY);
153                }
154            }
155
156            match result {
157                Ok(0) => {
158                    log::trace!("{}: Tcp stream is disconnected", self.tag());
159                    inner.io_stopped(None);
160                    break;
161                }
162                Ok(_) => {
163                    if inner.flags.get().contains(Flags::IO_STOPPING_FILTERS) {
164                        lazy(|cx| self.shutdown_filters(cx)).await;
165                    }
166                }
167                Err(err) => {
168                    log::trace!("{}: Read task failed on io {:?}", self.tag(), err);
169                    inner.io_stopped(Some(err));
170                    break;
171                }
172            }
173        }
174    }
175
176    fn shutdown_filters(&self, cx: &mut Context<'_>) {
177        let st = &self.0 .0;
178        let filter = self.0.filter();
179
180        match filter.shutdown(&self.0, &st.buffer, 0) {
181            Ok(Poll::Ready(())) => {
182                st.dispatch_task.wake();
183                st.insert_flags(Flags::IO_STOPPING);
184            }
185            Ok(Poll::Pending) => {
186                let flags = st.flags.get();
187
188                // check read buffer, if buffer is not consumed it is unlikely
189                // that filter will properly complete shutdown
190                if flags.contains(Flags::RD_PAUSED)
191                    || flags.contains(Flags::BUF_R_FULL | Flags::BUF_R_READY)
192                {
193                    st.dispatch_task.wake();
194                    st.insert_flags(Flags::IO_STOPPING);
195                } else {
196                    // filter shutdown timeout
197                    let timeout = self
198                        .1
199                        .take()
200                        .unwrap_or_else(|| sleep(st.disconnect_timeout.get()));
201                    if timeout.poll_elapsed(cx).is_ready() {
202                        st.dispatch_task.wake();
203                        st.insert_flags(Flags::IO_STOPPING);
204                    } else {
205                        self.1.set(Some(timeout));
206                    }
207                }
208            }
209            Err(err) => {
210                st.io_stopped(Some(err));
211            }
212        }
213        if let Err(err) = filter.process_write_buf(&self.0, &st.buffer, 0) {
214            st.io_stopped(Some(err));
215        }
216    }
217}
218
219#[derive(Debug)]
220/// Context for io write task
221pub struct WriteContext(IoRef);
222
223#[derive(Debug)]
224/// Context buf for io write task
225pub struct WriteContextBuf {
226    io: IoRef,
227    buf: Option<BytesVec>,
228}
229
230impl WriteContext {
231    pub(crate) fn new(io: &IoRef) -> Self {
232        Self(io.clone())
233    }
234
235    #[inline]
236    /// Io tag
237    pub fn tag(&self) -> &'static str {
238        self.0.tag()
239    }
240
241    /// Check readiness for write operations
242    async fn ready(&self) -> WriteStatus {
243        poll_fn(|cx| self.0.filter().poll_write_ready(cx)).await
244    }
245
246    /// Indicate that write io task is stopped
247    fn close(&self, err: Option<io::Error>) {
248        self.0 .0.io_stopped(err);
249    }
250
251    /// Check if io is closed
252    async fn when_stopped(&self) {
253        poll_fn(|cx| {
254            if self.0.flags().is_stopped() {
255                Poll::Ready(())
256            } else {
257                self.0 .0.write_task.register(cx.waker());
258                Poll::Pending
259            }
260        })
261        .await
262    }
263
264    /// Handle write io operations
265    pub async fn handle<T>(&self, io: &mut T)
266    where
267        T: AsyncWrite,
268    {
269        let mut buf = WriteContextBuf {
270            io: self.0.clone(),
271            buf: None,
272        };
273
274        loop {
275            match self.ready().await {
276                WriteStatus::Ready => {
277                    // write io stream
278                    match select(io.write(&mut buf), self.when_stopped()).await {
279                        Either::Left(Ok(_)) => continue,
280                        Either::Left(Err(e)) => self.close(Some(e)),
281                        Either::Right(_) => return,
282                    }
283                }
284                WriteStatus::Shutdown => {
285                    log::trace!("{}: Write task is instructed to shutdown", self.tag());
286
287                    let fut = async {
288                        // write io stream
289                        io.write(&mut buf).await?;
290                        io.flush().await?;
291                        io.shutdown().await?;
292                        Ok(())
293                    };
294                    match select(sleep(self.0 .0.disconnect_timeout.get()), fut).await {
295                        Either::Left(_) => self.close(None),
296                        Either::Right(res) => self.close(res.err()),
297                    }
298                }
299                WriteStatus::Terminate => {
300                    log::trace!("{}: Write task is instructed to terminate", self.tag());
301                    self.close(io.shutdown().await.err());
302                }
303            }
304            return;
305        }
306    }
307}
308
309impl WriteContextBuf {
310    pub fn set(&mut self, mut buf: BytesVec) {
311        if buf.is_empty() {
312            self.io.memory_pool().release_write_buf(buf);
313        } else if let Some(b) = self.buf.take() {
314            buf.extend_from_slice(&b);
315            self.io.memory_pool().release_write_buf(b);
316            self.buf = Some(buf);
317        } else if let Some(b) = self.io.0.buffer.set_write_destination(buf) {
318            // write buffer is already set
319            self.buf = Some(b);
320        }
321
322        // if write buffer is smaller than high watermark value, turn off back-pressure
323        let inner = &self.io.0;
324        let len = self.buf.as_ref().map(|b| b.len()).unwrap_or_default()
325            + inner.buffer.write_destination_size();
326        let mut flags = inner.flags.get();
327
328        if len == 0 {
329            if flags.is_waiting_for_write() {
330                flags.waiting_for_write_is_done();
331                inner.dispatch_task.wake();
332            }
333            flags.insert(Flags::WR_PAUSED);
334            inner.flags.set(flags);
335        } else if flags.contains(Flags::BUF_W_BACKPRESSURE)
336            && len < inner.pool.get().write_params_high() << 1
337        {
338            flags.remove(Flags::BUF_W_BACKPRESSURE);
339            inner.flags.set(flags);
340            inner.dispatch_task.wake();
341        }
342    }
343
344    pub fn take(&mut self) -> Option<BytesVec> {
345        if let Some(buf) = self.buf.take() {
346            Some(buf)
347        } else {
348            self.io.0.buffer.get_write_destination()
349        }
350    }
351}
352
353/// Context for io read task
354pub struct IoContext(IoRef);
355
356impl fmt::Debug for IoContext {
357    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
358        f.debug_struct("IoContext").field("io", &self.0).finish()
359    }
360}
361
362impl IoContext {
363    pub(crate) fn new(io: &IoRef) -> Self {
364        Self(io.clone())
365    }
366
367    #[inline]
368    /// Io tag
369    pub fn tag(&self) -> &'static str {
370        self.0.tag()
371    }
372
373    #[doc(hidden)]
374    /// Io flags
375    pub fn flags(&self) -> crate::flags::Flags {
376        self.0.flags()
377    }
378
379    #[inline]
380    /// Check readiness for read operations
381    pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<ReadStatus> {
382        self.shutdown_filters();
383        self.0.filter().poll_read_ready(cx)
384    }
385
386    #[inline]
387    /// Check readiness for write operations
388    pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<WriteStatus> {
389        self.0.filter().poll_write_ready(cx)
390    }
391
392    #[inline]
393    /// Get io error
394    pub fn stopped(&self, e: Option<io::Error>) {
395        self.0 .0.io_stopped(e);
396    }
397
398    /// Wait when io get closed or preparing for close
399    pub async fn shutdown(&self, flush_buf: bool) {
400        let st = &self.0 .0;
401        let mut timeout = None;
402
403        poll_fn(|cx| {
404            let flags = self.0.flags();
405
406            if flags.intersects(Flags::IO_STOPPING | Flags::IO_STOPPED) {
407                Poll::Ready(())
408            } else {
409                st.write_task.register(cx.waker());
410                if flags.contains(Flags::IO_STOPPING_FILTERS) {
411                    if timeout.is_none() {
412                        timeout = Some(sleep(st.disconnect_timeout.get()));
413                    }
414                    if timeout.as_ref().unwrap().poll_elapsed(cx).is_ready() {
415                        st.dispatch_task.wake();
416                        st.insert_flags(Flags::IO_STOPPING);
417                        return Poll::Ready(());
418                    }
419                }
420                Poll::Pending
421            }
422        })
423        .await;
424
425        if flush_buf && !self.0.flags().contains(Flags::WR_PAUSED) {
426            st.insert_flags(Flags::WR_TASK_WAIT);
427
428            poll_fn(|cx| {
429                let flags = self.0.flags();
430
431                if flags.intersects(Flags::WR_PAUSED | Flags::IO_STOPPED) {
432                    Poll::Ready(())
433                } else {
434                    st.write_task.register(cx.waker());
435
436                    if timeout.is_none() {
437                        timeout = Some(sleep(st.disconnect_timeout.get()));
438                    }
439                    if timeout.as_ref().unwrap().poll_elapsed(cx).is_ready() {
440                        Poll::Ready(())
441                    } else {
442                        Poll::Pending
443                    }
444                }
445            })
446            .await;
447        }
448    }
449
450    /// Get read buffer
451    pub fn get_read_buf(&self) -> Poll<BytesVec> {
452        let inner = &self.0 .0;
453
454        if let Some(waker) = inner.read_task.take() {
455            let mut cx = Context::from_waker(&waker);
456
457            if let Poll::Ready(ReadStatus::Ready) = self.0.filter().poll_read_ready(&mut cx)
458            {
459                let mut buf = if inner.flags.get().is_read_buf_ready() {
460                    // read buffer is still not read by dispatcher
461                    // we cannot touch it
462                    inner.pool.get().get_read_buf()
463                } else {
464                    inner
465                        .buffer
466                        .get_read_source()
467                        .unwrap_or_else(|| inner.pool.get().get_read_buf())
468                };
469
470                // make sure we've got room
471                let (hw, lw) = self.0.memory_pool().read_params().unpack();
472                let remaining = buf.remaining_mut();
473                if remaining < lw {
474                    buf.reserve(hw - remaining);
475                }
476                return Poll::Ready(buf);
477            }
478        }
479
480        Poll::Pending
481    }
482
483    pub fn release_read_buf(&self, buf: BytesVec) {
484        let inner = &self.0 .0;
485        if let Some(mut first_buf) = inner.buffer.get_read_source() {
486            first_buf.extend_from_slice(&buf);
487            inner.buffer.set_read_source(&self.0, first_buf);
488        } else {
489            inner.buffer.set_read_source(&self.0, buf);
490        }
491    }
492
493    /// Set read buffer
494    pub fn set_read_buf(&self, result: io::Result<usize>, buf: BytesVec) -> Poll<()> {
495        let inner = &self.0 .0;
496        let (hw, _) = self.0.memory_pool().read_params().unpack();
497
498        if let Some(mut first_buf) = inner.buffer.get_read_source() {
499            first_buf.extend_from_slice(&buf);
500            inner.buffer.set_read_source(&self.0, first_buf);
501        } else {
502            inner.buffer.set_read_source(&self.0, buf);
503        }
504
505        match result {
506            Ok(0) => {
507                inner.io_stopped(None);
508                Poll::Ready(())
509            }
510            Ok(nbytes) => {
511                let filter = self.0.filter();
512                let res = filter
513                    .process_read_buf(&self.0, &inner.buffer, 0, nbytes)
514                    .and_then(|status| {
515                        if status.nbytes > 0 {
516                            // dest buffer has new data, wake up dispatcher
517                            if inner.buffer.read_destination_size() >= hw {
518                                log::trace!(
519                                    "{}: Io read buffer is too large {}, enable read back-pressure",
520                                    self.0.tag(),
521                                    nbytes
522                                );
523                                inner.insert_flags(Flags::BUF_R_READY | Flags::BUF_R_FULL);
524                            } else {
525                                inner.insert_flags(Flags::BUF_R_READY);
526
527                                if nbytes >= hw {
528                                    // read task is paused because of read back-pressure
529                                    // but there is no new data in top most read buffer
530                                    // so we need to wake up read task to read more data
531                                    // otherwise read task would sleep forever
532                                    inner.read_task.wake();
533                                }
534                            }
535                            log::trace!(
536                                "{}: New {} bytes available, wakeup dispatcher",
537                                self.0.tag(),
538                                nbytes
539                            );
540                            inner.dispatch_task.wake();
541                        } else {
542                            if nbytes >= hw {
543                                // read task is paused because of read back-pressure
544                                // but there is no new data in top most read buffer
545                                // so we need to wake up read task to read more data
546                                // otherwise read task would sleep forever
547                                inner.read_task.wake();
548                            }
549                            if inner.flags.get().is_waiting_for_read() {
550                                // in case of "notify" we must wake up dispatch task
551                                // if we read any data from source
552                                inner.dispatch_task.wake();
553                            }
554                        }
555
556                        // while reading, filter wrote some data
557                        // in that case filters need to process write buffers
558                        // and potentialy wake write task
559                        if status.need_write {
560                            inner.write_task.wake();
561                            filter.process_write_buf(&self.0, &inner.buffer, 0)
562                        } else {
563                            Ok(())
564                        }
565                    });
566
567                if let Err(err) = res {
568                    inner.io_stopped(Some(err));
569                    Poll::Ready(())
570                } else {
571                    self.shutdown_filters();
572                    Poll::Pending
573                }
574            }
575            Err(e) => {
576                inner.io_stopped(Some(e));
577                Poll::Ready(())
578            }
579        }
580    }
581
582    /// Get write buffer
583    pub fn get_write_buf(&self) -> Poll<BytesVec> {
584        let inner = &self.0 .0;
585
586        // check write readiness
587        if let Some(waker) = inner.write_task.take() {
588            let ready = self
589                .0
590                .filter()
591                .poll_write_ready(&mut Context::from_waker(&waker));
592            let buf = if matches!(
593                ready,
594                Poll::Ready(WriteStatus::Ready | WriteStatus::Shutdown)
595            ) {
596                inner.buffer.get_write_destination().and_then(|buf| {
597                    if buf.is_empty() {
598                        None
599                    } else {
600                        Some(buf)
601                    }
602                })
603            } else {
604                None
605            };
606
607            if let Some(buf) = buf {
608                return Poll::Ready(buf);
609            }
610        }
611        Poll::Pending
612    }
613
614    pub fn release_write_buf(&self, mut buf: BytesVec) {
615        let inner = &self.0 .0;
616
617        if let Some(b) = inner.buffer.get_write_destination() {
618            buf.extend_from_slice(&b);
619            self.0.memory_pool().release_write_buf(b);
620        }
621        inner.buffer.set_write_destination(buf);
622
623        // if write buffer is smaller than high watermark value, turn off back-pressure
624        let len = inner.buffer.write_destination_size();
625        let mut flags = inner.flags.get();
626
627        if len == 0 {
628            if flags.is_waiting_for_write() {
629                flags.waiting_for_write_is_done();
630                inner.dispatch_task.wake();
631            }
632            flags.insert(Flags::WR_PAUSED);
633            inner.flags.set(flags);
634        } else if flags.contains(Flags::BUF_W_BACKPRESSURE)
635            && len < inner.pool.get().write_params_high() << 1
636        {
637            flags.remove(Flags::BUF_W_BACKPRESSURE);
638            inner.flags.set(flags);
639            inner.dispatch_task.wake();
640        }
641        inner.flags.set(flags);
642    }
643
644    /// Set write buffer
645    pub fn set_write_buf(&self, result: io::Result<usize>, mut buf: BytesVec) -> Poll<()> {
646        let result = match result {
647            Ok(0) => {
648                log::trace!("{}: Disconnected during flush", self.tag());
649                Err(io::Error::new(
650                    io::ErrorKind::WriteZero,
651                    "failed to write frame to transport",
652                ))
653            }
654            Ok(n) => {
655                if n == buf.len() {
656                    buf.clear();
657                    Ok(0)
658                } else {
659                    buf.advance(n);
660                    Ok(buf.len())
661                }
662            }
663            Err(e) => Err(e),
664        };
665
666        let inner = &self.0 .0;
667
668        // set buffer back
669        let result = match result {
670            Ok(0) => {
671                // log::debug!("{}: WROTE ALL {:?}", self.0.tag(), inner.buffer.write_destination_size());
672                self.0.memory_pool().release_write_buf(buf);
673                Ok(inner.buffer.write_destination_size())
674            }
675            Ok(_) => {
676                if let Some(b) = inner.buffer.get_write_destination() {
677                    buf.extend_from_slice(&b);
678                    self.0.memory_pool().release_write_buf(b);
679                }
680                let l = buf.len();
681                // log::debug!("{}: WROTE SOME {:?}", self.0.tag(), l);
682                inner.buffer.set_write_destination(buf);
683                Ok(l)
684            }
685            Err(e) => Err(e),
686        };
687
688        let mut flags = inner.flags.get();
689        match result {
690            Ok(0) => {
691                // all data has been written
692                flags.insert(Flags::WR_PAUSED);
693
694                if flags.is_task_waiting_for_write() {
695                    flags.task_waiting_for_write_is_done();
696                    inner.write_task.wake();
697                }
698
699                if flags.is_waiting_for_write() {
700                    flags.waiting_for_write_is_done();
701                    inner.dispatch_task.wake();
702                }
703                inner.flags.set(flags);
704                Poll::Ready(())
705            }
706            Ok(len) => {
707                // if write buffer is smaller than high watermark value, turn off back-pressure
708                if flags.contains(Flags::BUF_W_BACKPRESSURE)
709                    && len < inner.pool.get().write_params_high() << 1
710                {
711                    flags.remove(Flags::BUF_W_BACKPRESSURE);
712                    inner.flags.set(flags);
713                    inner.dispatch_task.wake();
714                }
715                Poll::Pending
716            }
717            Err(e) => {
718                inner.io_stopped(Some(e));
719                Poll::Ready(())
720            }
721        }
722    }
723
724    /// Get read buffer
725    pub fn with_read_buf<F>(&self, f: F) -> Poll<()>
726    where
727        F: FnOnce(&mut BytesVec) -> Poll<io::Result<usize>>,
728    {
729        let result = self.with_read_buf_inner(f);
730
731        // check read readiness
732        if result.is_pending() {
733            if let Some(waker) = self.0 .0.read_task.take() {
734                let mut cx = Context::from_waker(&waker);
735
736                if let Poll::Ready(ReadStatus::Ready) =
737                    self.0.filter().poll_read_ready(&mut cx)
738                {
739                    return Poll::Pending;
740                }
741            }
742        }
743        result
744    }
745
746    fn with_read_buf_inner<F>(&self, f: F) -> Poll<()>
747    where
748        F: FnOnce(&mut BytesVec) -> Poll<io::Result<usize>>,
749    {
750        let inner = &self.0 .0;
751        let (hw, lw) = self.0.memory_pool().read_params().unpack();
752        let result = inner.buffer.with_read_source(&self.0, |buf| {
753            // make sure we've got room
754            let remaining = buf.remaining_mut();
755            if remaining < lw {
756                buf.reserve(hw - remaining);
757            }
758
759            f(buf)
760        });
761
762        // handle buffer changes
763        match result {
764            Poll::Ready(Ok(0)) => {
765                inner.io_stopped(None);
766                Poll::Ready(())
767            }
768            Poll::Ready(Ok(nbytes)) => {
769                let filter = self.0.filter();
770                let _ = filter
771                    .process_read_buf(&self.0, &inner.buffer, 0, nbytes)
772                    .and_then(|status| {
773                        if status.nbytes > 0 {
774                            // dest buffer has new data, wake up dispatcher
775                            if inner.buffer.read_destination_size() >= hw {
776                                log::trace!(
777                                    "{}: Io read buffer is too large {}, enable read back-pressure",
778                                    self.0.tag(),
779                                    nbytes
780                                );
781                                inner.insert_flags(Flags::BUF_R_READY | Flags::BUF_R_FULL);
782                            } else {
783                                inner.insert_flags(Flags::BUF_R_READY);
784
785                                if nbytes >= hw {
786                                    // read task is paused because of read back-pressure
787                                    // but there is no new data in top most read buffer
788                                    // so we need to wake up read task to read more data
789                                    // otherwise read task would sleep forever
790                                    inner.read_task.wake();
791                                }
792                            }
793                            log::trace!(
794                                "{}: New {} bytes available, wakeup dispatcher",
795                                self.0.tag(),
796                                nbytes
797                            );
798                            inner.dispatch_task.wake();
799                        } else {
800                            if nbytes >= hw {
801                                // read task is paused because of read back-pressure
802                                // but there is no new data in top most read buffer
803                                // so we need to wake up read task to read more data
804                                // otherwise read task would sleep forever
805                                inner.read_task.wake();
806                            }
807                            if inner.flags.get().is_waiting_for_read() {
808                                // in case of "notify" we must wake up dispatch task
809                                // if we read any data from source
810                                inner.dispatch_task.wake();
811                            }
812                        }
813
814                        // while reading, filter wrote some data
815                        // in that case filters need to process write buffers
816                        // and potentialy wake write task
817                        if status.need_write {
818                            filter.process_write_buf(&self.0, &inner.buffer, 0)
819                        } else {
820                            Ok(())
821                        }
822                    })
823                    .map_err(|err| {
824                        inner.dispatch_task.wake();
825                        inner.io_stopped(Some(err));
826                        inner.insert_flags(Flags::BUF_R_READY);
827                    });
828                Poll::Pending
829            }
830            Poll::Ready(Err(e)) => {
831                inner.io_stopped(Some(e));
832                Poll::Ready(())
833            }
834            Poll::Pending => {
835                self.shutdown_filters();
836                Poll::Pending
837            }
838        }
839    }
840
841    pub fn with_write_buf<F>(&self, f: F) -> Poll<()>
842    where
843        F: FnOnce(&BytesVec) -> Poll<io::Result<usize>>,
844    {
845        let result = self.with_write_buf_inner(f);
846
847        // check write readiness
848        if result.is_pending() {
849            let inner = &self.0 .0;
850            if let Some(waker) = inner.write_task.take() {
851                let ready = self
852                    .0
853                    .filter()
854                    .poll_write_ready(&mut Context::from_waker(&waker));
855                if !matches!(
856                    ready,
857                    Poll::Ready(WriteStatus::Ready | WriteStatus::Shutdown)
858                ) {
859                    return Poll::Ready(());
860                }
861            }
862        }
863        result
864    }
865
866    /// Get write buffer
867    fn with_write_buf_inner<F>(&self, f: F) -> Poll<()>
868    where
869        F: FnOnce(&BytesVec) -> Poll<io::Result<usize>>,
870    {
871        let inner = &self.0 .0;
872        let result = inner.buffer.with_write_destination(&self.0, |buf| {
873            let Some(buf) =
874                buf.and_then(|buf| if buf.is_empty() { None } else { Some(buf) })
875            else {
876                return Poll::Ready(Ok(0));
877            };
878
879            match ready!(f(buf)) {
880                Ok(0) => {
881                    log::trace!("{}: Disconnected during flush", self.tag());
882                    Poll::Ready(Err(io::Error::new(
883                        io::ErrorKind::WriteZero,
884                        "failed to write frame to transport",
885                    )))
886                }
887                Ok(n) => {
888                    if n == buf.len() {
889                        buf.clear();
890                        Poll::Ready(Ok(0))
891                    } else {
892                        buf.advance(n);
893                        Poll::Ready(Ok(buf.len()))
894                    }
895                }
896                Err(e) => Poll::Ready(Err(e)),
897            }
898        });
899
900        let mut flags = inner.flags.get();
901
902        let result = match result {
903            Poll::Pending => {
904                flags.remove(Flags::WR_PAUSED);
905                Poll::Pending
906            }
907            Poll::Ready(Ok(0)) => {
908                // all data has been written
909                flags.insert(Flags::WR_PAUSED);
910
911                if flags.is_task_waiting_for_write() {
912                    flags.task_waiting_for_write_is_done();
913                    inner.write_task.wake();
914                }
915
916                if flags.is_waiting_for_write() {
917                    flags.waiting_for_write_is_done();
918                    inner.dispatch_task.wake();
919                }
920                Poll::Ready(())
921            }
922            Poll::Ready(Ok(len)) => {
923                // if write buffer is smaller than high watermark value, turn off back-pressure
924                if flags.contains(Flags::BUF_W_BACKPRESSURE)
925                    && len < inner.pool.get().write_params_high() << 1
926                {
927                    flags.remove(Flags::BUF_W_BACKPRESSURE);
928                    inner.dispatch_task.wake();
929                }
930                Poll::Pending
931            }
932            Poll::Ready(Err(e)) => {
933                self.0 .0.io_stopped(Some(e));
934                Poll::Ready(())
935            }
936        };
937
938        inner.flags.set(flags);
939        result
940    }
941
942    fn shutdown_filters(&self) {
943        let io = &self.0;
944        let st = &self.0 .0;
945        if st.flags.get().contains(Flags::IO_STOPPING_FILTERS) {
946            let flags = st.flags.get();
947
948            if !flags.intersects(Flags::IO_STOPPED | Flags::IO_STOPPING) {
949                let filter = io.filter();
950                match filter.shutdown(io, &st.buffer, 0) {
951                    Ok(Poll::Ready(())) => {
952                        st.dispatch_task.wake();
953                        st.insert_flags(Flags::IO_STOPPING);
954                    }
955                    Ok(Poll::Pending) => {
956                        // check read buffer, if buffer is not consumed it is unlikely
957                        // that filter will properly complete shutdown
958                        if flags.contains(Flags::RD_PAUSED)
959                            || flags.contains(Flags::BUF_R_FULL | Flags::BUF_R_READY)
960                        {
961                            st.dispatch_task.wake();
962                            st.insert_flags(Flags::IO_STOPPING);
963                        }
964                    }
965                    Err(err) => {
966                        st.io_stopped(Some(err));
967                    }
968                }
969                if let Err(err) = filter.process_write_buf(io, &st.buffer, 0) {
970                    st.io_stopped(Some(err));
971                }
972            }
973        }
974    }
975}
976
977impl Clone for IoContext {
978    fn clone(&self) -> Self {
979        Self(self.0.clone())
980    }
981}