1use std::{cell::Cell, fmt, future::poll_fn, io, task::ready, task::Context, task::Poll};
2
3use ntex_bytes::{Buf, BufMut, BytesVec};
4use ntex_util::{future::lazy, future::select, future::Either, time::sleep, time::Sleep};
5
6use crate::{AsyncRead, AsyncWrite, Flags, IoRef, ReadStatus, WriteStatus};
7
8pub struct ReadContext(IoRef, Cell<Option<Sleep>>);
10
11impl fmt::Debug for ReadContext {
12 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
13 f.debug_struct("ReadContext").field("io", &self.0).finish()
14 }
15}
16
17impl ReadContext {
18 pub(crate) fn new(io: &IoRef) -> Self {
19 Self(io.clone(), Cell::new(None))
20 }
21
22 #[doc(hidden)]
23 #[inline]
24 pub fn context(&self) -> IoContext {
26 IoContext::new(&self.0)
27 }
28
29 #[inline]
30 pub fn tag(&self) -> &'static str {
32 self.0.tag()
33 }
34
35 async fn wait_for_close(&self) {
37 poll_fn(|cx| {
38 let flags = self.0.flags();
39
40 if flags.intersects(Flags::IO_STOPPING | Flags::IO_STOPPED) {
41 Poll::Ready(())
42 } else {
43 self.0 .0.read_task.register(cx.waker());
44 if flags.contains(Flags::IO_STOPPING_FILTERS) {
45 self.shutdown_filters(cx);
46 }
47 Poll::Pending
48 }
49 })
50 .await
51 }
52
53 pub async fn handle<T>(&self, io: &mut T)
55 where
56 T: AsyncRead,
57 {
58 let inner = &self.0 .0;
59
60 loop {
61 let result = poll_fn(|cx| self.0.filter().poll_read_ready(cx)).await;
62 if result == ReadStatus::Terminate {
63 log::trace!("{}: Read task is instructed to shutdown", self.tag());
64 break;
65 }
66
67 let mut buf = if inner.flags.get().is_read_buf_ready() {
68 inner.pool.get().get_read_buf()
71 } else {
72 inner
73 .buffer
74 .get_read_source()
75 .unwrap_or_else(|| inner.pool.get().get_read_buf())
76 };
77
78 let (hw, lw) = self.0.memory_pool().read_params().unpack();
80 let remaining = buf.remaining_mut();
81 if remaining <= lw {
82 buf.reserve(hw - remaining);
83 }
84 let total = buf.len();
85
86 let (buf, result) = match select(io.read(buf), self.wait_for_close()).await {
88 Either::Left(res) => res,
89 Either::Right(_) => {
90 log::trace!("{}: Read io is closed, stop read task", self.tag());
91 break;
92 }
93 };
94
95 let total2 = buf.len();
97 let nbytes = total2.saturating_sub(total);
98 let total = total2;
99
100 if let Some(mut first_buf) = inner.buffer.get_read_source() {
101 first_buf.extend_from_slice(&buf);
102 inner.buffer.set_read_source(&self.0, first_buf);
103 } else {
104 inner.buffer.set_read_source(&self.0, buf);
105 }
106
107 if nbytes > 0 {
109 let filter = self.0.filter();
110 let res = match filter.process_read_buf(&self.0, &inner.buffer, 0, nbytes) {
111 Ok(status) => {
112 if status.nbytes > 0 {
113 if hw < inner.buffer.read_destination_size() {
115 log::trace!(
116 "{}: Io read buffer is too large {}, enable read back-pressure",
117 self.0.tag(),
118 total
119 );
120 inner.insert_flags(Flags::BUF_R_READY | Flags::BUF_R_FULL);
121 } else {
122 inner.insert_flags(Flags::BUF_R_READY);
123 }
124 log::trace!(
125 "{}: New {} bytes available, wakeup dispatcher",
126 self.0.tag(),
127 nbytes
128 );
129 inner.dispatch_task.wake();
131 } else if inner.flags.get().is_waiting_for_read() {
132 inner.dispatch_task.wake();
135 }
136
137 if status.need_write {
141 filter.process_write_buf(&self.0, &inner.buffer, 0)
142 } else {
143 Ok(())
144 }
145 }
146 Err(err) => Err(err),
147 };
148
149 if let Err(err) = res {
150 inner.dispatch_task.wake();
151 inner.io_stopped(Some(err));
152 inner.insert_flags(Flags::BUF_R_READY);
153 }
154 }
155
156 match result {
157 Ok(0) => {
158 log::trace!("{}: Tcp stream is disconnected", self.tag());
159 inner.io_stopped(None);
160 break;
161 }
162 Ok(_) => {
163 if inner.flags.get().contains(Flags::IO_STOPPING_FILTERS) {
164 lazy(|cx| self.shutdown_filters(cx)).await;
165 }
166 }
167 Err(err) => {
168 log::trace!("{}: Read task failed on io {:?}", self.tag(), err);
169 inner.io_stopped(Some(err));
170 break;
171 }
172 }
173 }
174 }
175
176 fn shutdown_filters(&self, cx: &mut Context<'_>) {
177 let st = &self.0 .0;
178 let filter = self.0.filter();
179
180 match filter.shutdown(&self.0, &st.buffer, 0) {
181 Ok(Poll::Ready(())) => {
182 st.dispatch_task.wake();
183 st.insert_flags(Flags::IO_STOPPING);
184 }
185 Ok(Poll::Pending) => {
186 let flags = st.flags.get();
187
188 if flags.contains(Flags::RD_PAUSED)
191 || flags.contains(Flags::BUF_R_FULL | Flags::BUF_R_READY)
192 {
193 st.dispatch_task.wake();
194 st.insert_flags(Flags::IO_STOPPING);
195 } else {
196 let timeout = self
198 .1
199 .take()
200 .unwrap_or_else(|| sleep(st.disconnect_timeout.get()));
201 if timeout.poll_elapsed(cx).is_ready() {
202 st.dispatch_task.wake();
203 st.insert_flags(Flags::IO_STOPPING);
204 } else {
205 self.1.set(Some(timeout));
206 }
207 }
208 }
209 Err(err) => {
210 st.io_stopped(Some(err));
211 }
212 }
213 if let Err(err) = filter.process_write_buf(&self.0, &st.buffer, 0) {
214 st.io_stopped(Some(err));
215 }
216 }
217}
218
219#[derive(Debug)]
220pub struct WriteContext(IoRef);
222
223#[derive(Debug)]
224pub struct WriteContextBuf {
226 io: IoRef,
227 buf: Option<BytesVec>,
228}
229
230impl WriteContext {
231 pub(crate) fn new(io: &IoRef) -> Self {
232 Self(io.clone())
233 }
234
235 #[inline]
236 pub fn tag(&self) -> &'static str {
238 self.0.tag()
239 }
240
241 async fn ready(&self) -> WriteStatus {
243 poll_fn(|cx| self.0.filter().poll_write_ready(cx)).await
244 }
245
246 fn close(&self, err: Option<io::Error>) {
248 self.0 .0.io_stopped(err);
249 }
250
251 async fn when_stopped(&self) {
253 poll_fn(|cx| {
254 if self.0.flags().is_stopped() {
255 Poll::Ready(())
256 } else {
257 self.0 .0.write_task.register(cx.waker());
258 Poll::Pending
259 }
260 })
261 .await
262 }
263
264 pub async fn handle<T>(&self, io: &mut T)
266 where
267 T: AsyncWrite,
268 {
269 let mut buf = WriteContextBuf {
270 io: self.0.clone(),
271 buf: None,
272 };
273
274 loop {
275 match self.ready().await {
276 WriteStatus::Ready => {
277 match select(io.write(&mut buf), self.when_stopped()).await {
279 Either::Left(Ok(_)) => continue,
280 Either::Left(Err(e)) => self.close(Some(e)),
281 Either::Right(_) => return,
282 }
283 }
284 WriteStatus::Shutdown => {
285 log::trace!("{}: Write task is instructed to shutdown", self.tag());
286
287 let fut = async {
288 io.write(&mut buf).await?;
290 io.flush().await?;
291 io.shutdown().await?;
292 Ok(())
293 };
294 match select(sleep(self.0 .0.disconnect_timeout.get()), fut).await {
295 Either::Left(_) => self.close(None),
296 Either::Right(res) => self.close(res.err()),
297 }
298 }
299 WriteStatus::Terminate => {
300 log::trace!("{}: Write task is instructed to terminate", self.tag());
301 self.close(io.shutdown().await.err());
302 }
303 }
304 return;
305 }
306 }
307}
308
309impl WriteContextBuf {
310 pub fn set(&mut self, mut buf: BytesVec) {
311 if buf.is_empty() {
312 self.io.memory_pool().release_write_buf(buf);
313 } else if let Some(b) = self.buf.take() {
314 buf.extend_from_slice(&b);
315 self.io.memory_pool().release_write_buf(b);
316 self.buf = Some(buf);
317 } else if let Some(b) = self.io.0.buffer.set_write_destination(buf) {
318 self.buf = Some(b);
320 }
321
322 let inner = &self.io.0;
324 let len = self.buf.as_ref().map(|b| b.len()).unwrap_or_default()
325 + inner.buffer.write_destination_size();
326 let mut flags = inner.flags.get();
327
328 if len == 0 {
329 if flags.is_waiting_for_write() {
330 flags.waiting_for_write_is_done();
331 inner.dispatch_task.wake();
332 }
333 flags.insert(Flags::WR_PAUSED);
334 inner.flags.set(flags);
335 } else if flags.contains(Flags::BUF_W_BACKPRESSURE)
336 && len < inner.pool.get().write_params_high() << 1
337 {
338 flags.remove(Flags::BUF_W_BACKPRESSURE);
339 inner.flags.set(flags);
340 inner.dispatch_task.wake();
341 }
342 }
343
344 pub fn take(&mut self) -> Option<BytesVec> {
345 if let Some(buf) = self.buf.take() {
346 Some(buf)
347 } else {
348 self.io.0.buffer.get_write_destination()
349 }
350 }
351}
352
353pub struct IoContext(IoRef);
355
356impl fmt::Debug for IoContext {
357 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
358 f.debug_struct("IoContext").field("io", &self.0).finish()
359 }
360}
361
362impl IoContext {
363 pub(crate) fn new(io: &IoRef) -> Self {
364 Self(io.clone())
365 }
366
367 #[inline]
368 pub fn tag(&self) -> &'static str {
370 self.0.tag()
371 }
372
373 #[doc(hidden)]
374 pub fn flags(&self) -> crate::flags::Flags {
376 self.0.flags()
377 }
378
379 #[inline]
380 pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<ReadStatus> {
382 self.shutdown_filters();
383 self.0.filter().poll_read_ready(cx)
384 }
385
386 #[inline]
387 pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<WriteStatus> {
389 self.0.filter().poll_write_ready(cx)
390 }
391
392 #[inline]
393 pub fn stopped(&self, e: Option<io::Error>) {
395 self.0 .0.io_stopped(e);
396 }
397
398 pub async fn shutdown(&self, flush_buf: bool) {
400 let st = &self.0 .0;
401 let mut timeout = None;
402
403 poll_fn(|cx| {
404 let flags = self.0.flags();
405
406 if flags.intersects(Flags::IO_STOPPING | Flags::IO_STOPPED) {
407 Poll::Ready(())
408 } else {
409 st.write_task.register(cx.waker());
410 if flags.contains(Flags::IO_STOPPING_FILTERS) {
411 if timeout.is_none() {
412 timeout = Some(sleep(st.disconnect_timeout.get()));
413 }
414 if timeout.as_ref().unwrap().poll_elapsed(cx).is_ready() {
415 st.dispatch_task.wake();
416 st.insert_flags(Flags::IO_STOPPING);
417 return Poll::Ready(());
418 }
419 }
420 Poll::Pending
421 }
422 })
423 .await;
424
425 if flush_buf && !self.0.flags().contains(Flags::WR_PAUSED) {
426 st.insert_flags(Flags::WR_TASK_WAIT);
427
428 poll_fn(|cx| {
429 let flags = self.0.flags();
430
431 if flags.intersects(Flags::WR_PAUSED | Flags::IO_STOPPED) {
432 Poll::Ready(())
433 } else {
434 st.write_task.register(cx.waker());
435
436 if timeout.is_none() {
437 timeout = Some(sleep(st.disconnect_timeout.get()));
438 }
439 if timeout.as_ref().unwrap().poll_elapsed(cx).is_ready() {
440 Poll::Ready(())
441 } else {
442 Poll::Pending
443 }
444 }
445 })
446 .await;
447 }
448 }
449
450 pub fn get_read_buf(&self) -> Poll<BytesVec> {
452 let inner = &self.0 .0;
453
454 if let Some(waker) = inner.read_task.take() {
455 let mut cx = Context::from_waker(&waker);
456
457 if let Poll::Ready(ReadStatus::Ready) = self.0.filter().poll_read_ready(&mut cx)
458 {
459 let mut buf = if inner.flags.get().is_read_buf_ready() {
460 inner.pool.get().get_read_buf()
463 } else {
464 inner
465 .buffer
466 .get_read_source()
467 .unwrap_or_else(|| inner.pool.get().get_read_buf())
468 };
469
470 let (hw, lw) = self.0.memory_pool().read_params().unpack();
472 let remaining = buf.remaining_mut();
473 if remaining < lw {
474 buf.reserve(hw - remaining);
475 }
476 return Poll::Ready(buf);
477 }
478 }
479
480 Poll::Pending
481 }
482
483 pub fn release_read_buf(&self, buf: BytesVec) {
484 let inner = &self.0 .0;
485 if let Some(mut first_buf) = inner.buffer.get_read_source() {
486 first_buf.extend_from_slice(&buf);
487 inner.buffer.set_read_source(&self.0, first_buf);
488 } else {
489 inner.buffer.set_read_source(&self.0, buf);
490 }
491 }
492
493 pub fn set_read_buf(&self, result: io::Result<usize>, buf: BytesVec) -> Poll<()> {
495 let inner = &self.0 .0;
496 let (hw, _) = self.0.memory_pool().read_params().unpack();
497
498 if let Some(mut first_buf) = inner.buffer.get_read_source() {
499 first_buf.extend_from_slice(&buf);
500 inner.buffer.set_read_source(&self.0, first_buf);
501 } else {
502 inner.buffer.set_read_source(&self.0, buf);
503 }
504
505 match result {
506 Ok(0) => {
507 inner.io_stopped(None);
508 Poll::Ready(())
509 }
510 Ok(nbytes) => {
511 let filter = self.0.filter();
512 let res = filter
513 .process_read_buf(&self.0, &inner.buffer, 0, nbytes)
514 .and_then(|status| {
515 if status.nbytes > 0 {
516 if inner.buffer.read_destination_size() >= hw {
518 log::trace!(
519 "{}: Io read buffer is too large {}, enable read back-pressure",
520 self.0.tag(),
521 nbytes
522 );
523 inner.insert_flags(Flags::BUF_R_READY | Flags::BUF_R_FULL);
524 } else {
525 inner.insert_flags(Flags::BUF_R_READY);
526
527 if nbytes >= hw {
528 inner.read_task.wake();
533 }
534 }
535 log::trace!(
536 "{}: New {} bytes available, wakeup dispatcher",
537 self.0.tag(),
538 nbytes
539 );
540 inner.dispatch_task.wake();
541 } else {
542 if nbytes >= hw {
543 inner.read_task.wake();
548 }
549 if inner.flags.get().is_waiting_for_read() {
550 inner.dispatch_task.wake();
553 }
554 }
555
556 if status.need_write {
560 inner.write_task.wake();
561 filter.process_write_buf(&self.0, &inner.buffer, 0)
562 } else {
563 Ok(())
564 }
565 });
566
567 if let Err(err) = res {
568 inner.io_stopped(Some(err));
569 Poll::Ready(())
570 } else {
571 self.shutdown_filters();
572 Poll::Pending
573 }
574 }
575 Err(e) => {
576 inner.io_stopped(Some(e));
577 Poll::Ready(())
578 }
579 }
580 }
581
582 pub fn get_write_buf(&self) -> Poll<BytesVec> {
584 let inner = &self.0 .0;
585
586 if let Some(waker) = inner.write_task.take() {
588 let ready = self
589 .0
590 .filter()
591 .poll_write_ready(&mut Context::from_waker(&waker));
592 let buf = if matches!(
593 ready,
594 Poll::Ready(WriteStatus::Ready | WriteStatus::Shutdown)
595 ) {
596 inner.buffer.get_write_destination().and_then(|buf| {
597 if buf.is_empty() {
598 None
599 } else {
600 Some(buf)
601 }
602 })
603 } else {
604 None
605 };
606
607 if let Some(buf) = buf {
608 return Poll::Ready(buf);
609 }
610 }
611 Poll::Pending
612 }
613
614 pub fn release_write_buf(&self, mut buf: BytesVec) {
615 let inner = &self.0 .0;
616
617 if let Some(b) = inner.buffer.get_write_destination() {
618 buf.extend_from_slice(&b);
619 self.0.memory_pool().release_write_buf(b);
620 }
621 inner.buffer.set_write_destination(buf);
622
623 let len = inner.buffer.write_destination_size();
625 let mut flags = inner.flags.get();
626
627 if len == 0 {
628 if flags.is_waiting_for_write() {
629 flags.waiting_for_write_is_done();
630 inner.dispatch_task.wake();
631 }
632 flags.insert(Flags::WR_PAUSED);
633 inner.flags.set(flags);
634 } else if flags.contains(Flags::BUF_W_BACKPRESSURE)
635 && len < inner.pool.get().write_params_high() << 1
636 {
637 flags.remove(Flags::BUF_W_BACKPRESSURE);
638 inner.flags.set(flags);
639 inner.dispatch_task.wake();
640 }
641 inner.flags.set(flags);
642 }
643
644 pub fn set_write_buf(&self, result: io::Result<usize>, mut buf: BytesVec) -> Poll<()> {
646 let result = match result {
647 Ok(0) => {
648 log::trace!("{}: Disconnected during flush", self.tag());
649 Err(io::Error::new(
650 io::ErrorKind::WriteZero,
651 "failed to write frame to transport",
652 ))
653 }
654 Ok(n) => {
655 if n == buf.len() {
656 buf.clear();
657 Ok(0)
658 } else {
659 buf.advance(n);
660 Ok(buf.len())
661 }
662 }
663 Err(e) => Err(e),
664 };
665
666 let inner = &self.0 .0;
667
668 let result = match result {
670 Ok(0) => {
671 self.0.memory_pool().release_write_buf(buf);
673 Ok(inner.buffer.write_destination_size())
674 }
675 Ok(_) => {
676 if let Some(b) = inner.buffer.get_write_destination() {
677 buf.extend_from_slice(&b);
678 self.0.memory_pool().release_write_buf(b);
679 }
680 let l = buf.len();
681 inner.buffer.set_write_destination(buf);
683 Ok(l)
684 }
685 Err(e) => Err(e),
686 };
687
688 let mut flags = inner.flags.get();
689 match result {
690 Ok(0) => {
691 flags.insert(Flags::WR_PAUSED);
693
694 if flags.is_task_waiting_for_write() {
695 flags.task_waiting_for_write_is_done();
696 inner.write_task.wake();
697 }
698
699 if flags.is_waiting_for_write() {
700 flags.waiting_for_write_is_done();
701 inner.dispatch_task.wake();
702 }
703 inner.flags.set(flags);
704 Poll::Ready(())
705 }
706 Ok(len) => {
707 if flags.contains(Flags::BUF_W_BACKPRESSURE)
709 && len < inner.pool.get().write_params_high() << 1
710 {
711 flags.remove(Flags::BUF_W_BACKPRESSURE);
712 inner.flags.set(flags);
713 inner.dispatch_task.wake();
714 }
715 Poll::Pending
716 }
717 Err(e) => {
718 inner.io_stopped(Some(e));
719 Poll::Ready(())
720 }
721 }
722 }
723
724 pub fn with_read_buf<F>(&self, f: F) -> Poll<()>
726 where
727 F: FnOnce(&mut BytesVec) -> Poll<io::Result<usize>>,
728 {
729 let result = self.with_read_buf_inner(f);
730
731 if result.is_pending() {
733 if let Some(waker) = self.0 .0.read_task.take() {
734 let mut cx = Context::from_waker(&waker);
735
736 if let Poll::Ready(ReadStatus::Ready) =
737 self.0.filter().poll_read_ready(&mut cx)
738 {
739 return Poll::Pending;
740 }
741 }
742 }
743 result
744 }
745
746 fn with_read_buf_inner<F>(&self, f: F) -> Poll<()>
747 where
748 F: FnOnce(&mut BytesVec) -> Poll<io::Result<usize>>,
749 {
750 let inner = &self.0 .0;
751 let (hw, lw) = self.0.memory_pool().read_params().unpack();
752 let result = inner.buffer.with_read_source(&self.0, |buf| {
753 let remaining = buf.remaining_mut();
755 if remaining < lw {
756 buf.reserve(hw - remaining);
757 }
758
759 f(buf)
760 });
761
762 match result {
764 Poll::Ready(Ok(0)) => {
765 inner.io_stopped(None);
766 Poll::Ready(())
767 }
768 Poll::Ready(Ok(nbytes)) => {
769 let filter = self.0.filter();
770 let _ = filter
771 .process_read_buf(&self.0, &inner.buffer, 0, nbytes)
772 .and_then(|status| {
773 if status.nbytes > 0 {
774 if inner.buffer.read_destination_size() >= hw {
776 log::trace!(
777 "{}: Io read buffer is too large {}, enable read back-pressure",
778 self.0.tag(),
779 nbytes
780 );
781 inner.insert_flags(Flags::BUF_R_READY | Flags::BUF_R_FULL);
782 } else {
783 inner.insert_flags(Flags::BUF_R_READY);
784
785 if nbytes >= hw {
786 inner.read_task.wake();
791 }
792 }
793 log::trace!(
794 "{}: New {} bytes available, wakeup dispatcher",
795 self.0.tag(),
796 nbytes
797 );
798 inner.dispatch_task.wake();
799 } else {
800 if nbytes >= hw {
801 inner.read_task.wake();
806 }
807 if inner.flags.get().is_waiting_for_read() {
808 inner.dispatch_task.wake();
811 }
812 }
813
814 if status.need_write {
818 filter.process_write_buf(&self.0, &inner.buffer, 0)
819 } else {
820 Ok(())
821 }
822 })
823 .map_err(|err| {
824 inner.dispatch_task.wake();
825 inner.io_stopped(Some(err));
826 inner.insert_flags(Flags::BUF_R_READY);
827 });
828 Poll::Pending
829 }
830 Poll::Ready(Err(e)) => {
831 inner.io_stopped(Some(e));
832 Poll::Ready(())
833 }
834 Poll::Pending => {
835 self.shutdown_filters();
836 Poll::Pending
837 }
838 }
839 }
840
841 pub fn with_write_buf<F>(&self, f: F) -> Poll<()>
842 where
843 F: FnOnce(&BytesVec) -> Poll<io::Result<usize>>,
844 {
845 let result = self.with_write_buf_inner(f);
846
847 if result.is_pending() {
849 let inner = &self.0 .0;
850 if let Some(waker) = inner.write_task.take() {
851 let ready = self
852 .0
853 .filter()
854 .poll_write_ready(&mut Context::from_waker(&waker));
855 if !matches!(
856 ready,
857 Poll::Ready(WriteStatus::Ready | WriteStatus::Shutdown)
858 ) {
859 return Poll::Ready(());
860 }
861 }
862 }
863 result
864 }
865
866 fn with_write_buf_inner<F>(&self, f: F) -> Poll<()>
868 where
869 F: FnOnce(&BytesVec) -> Poll<io::Result<usize>>,
870 {
871 let inner = &self.0 .0;
872 let result = inner.buffer.with_write_destination(&self.0, |buf| {
873 let Some(buf) =
874 buf.and_then(|buf| if buf.is_empty() { None } else { Some(buf) })
875 else {
876 return Poll::Ready(Ok(0));
877 };
878
879 match ready!(f(buf)) {
880 Ok(0) => {
881 log::trace!("{}: Disconnected during flush", self.tag());
882 Poll::Ready(Err(io::Error::new(
883 io::ErrorKind::WriteZero,
884 "failed to write frame to transport",
885 )))
886 }
887 Ok(n) => {
888 if n == buf.len() {
889 buf.clear();
890 Poll::Ready(Ok(0))
891 } else {
892 buf.advance(n);
893 Poll::Ready(Ok(buf.len()))
894 }
895 }
896 Err(e) => Poll::Ready(Err(e)),
897 }
898 });
899
900 let mut flags = inner.flags.get();
901
902 let result = match result {
903 Poll::Pending => {
904 flags.remove(Flags::WR_PAUSED);
905 Poll::Pending
906 }
907 Poll::Ready(Ok(0)) => {
908 flags.insert(Flags::WR_PAUSED);
910
911 if flags.is_task_waiting_for_write() {
912 flags.task_waiting_for_write_is_done();
913 inner.write_task.wake();
914 }
915
916 if flags.is_waiting_for_write() {
917 flags.waiting_for_write_is_done();
918 inner.dispatch_task.wake();
919 }
920 Poll::Ready(())
921 }
922 Poll::Ready(Ok(len)) => {
923 if flags.contains(Flags::BUF_W_BACKPRESSURE)
925 && len < inner.pool.get().write_params_high() << 1
926 {
927 flags.remove(Flags::BUF_W_BACKPRESSURE);
928 inner.dispatch_task.wake();
929 }
930 Poll::Pending
931 }
932 Poll::Ready(Err(e)) => {
933 self.0 .0.io_stopped(Some(e));
934 Poll::Ready(())
935 }
936 };
937
938 inner.flags.set(flags);
939 result
940 }
941
942 fn shutdown_filters(&self) {
943 let io = &self.0;
944 let st = &self.0 .0;
945 if st.flags.get().contains(Flags::IO_STOPPING_FILTERS) {
946 let flags = st.flags.get();
947
948 if !flags.intersects(Flags::IO_STOPPED | Flags::IO_STOPPING) {
949 let filter = io.filter();
950 match filter.shutdown(io, &st.buffer, 0) {
951 Ok(Poll::Ready(())) => {
952 st.dispatch_task.wake();
953 st.insert_flags(Flags::IO_STOPPING);
954 }
955 Ok(Poll::Pending) => {
956 if flags.contains(Flags::RD_PAUSED)
959 || flags.contains(Flags::BUF_R_FULL | Flags::BUF_R_READY)
960 {
961 st.dispatch_task.wake();
962 st.insert_flags(Flags::IO_STOPPING);
963 }
964 }
965 Err(err) => {
966 st.io_stopped(Some(err));
967 }
968 }
969 if let Err(err) = filter.process_write_buf(io, &st.buffer, 0) {
970 st.io_stopped(Some(err));
971 }
972 }
973 }
974 }
975}
976
977impl Clone for IoContext {
978 fn clone(&self) -> Self {
979 Self(self.0.clone())
980 }
981}