1use std::cell::{Cell, UnsafeCell};
2use std::future::{Future, poll_fn};
3use std::task::{Context, Poll};
4use std::{fmt, hash, io, marker, mem, ops, pin::Pin, ptr, rc::Rc};
5
6use ntex_bytes::{BytePageSize, BytesMut};
7use ntex_codec::{Decoder, Encoder};
8use ntex_service::cfg::{Cfg, SharedCfg};
9use ntex_util::{future::Either, task::LocalWaker, time::Sleep};
10
11use crate::buf::Stack;
12use crate::cfg::IoConfig;
13use crate::ctx::IoContext;
14use crate::filter::{Base, Filter, Layer};
15use crate::filterptr::FilterPtr;
16use crate::flags::Flags;
17use crate::ops::{Id, IoManager, TimerHandle};
18use crate::seal::{IoBoxed, Sealed};
19use crate::{Decoded, FilterLayer, Handle, IoStatusUpdate, IoStream, RecvError};
20
21pub struct Io<F = Base>(UnsafeCell<IoRef>, marker::PhantomData<F>);
23
24#[derive(Clone)]
25pub struct IoRef(pub(super) Rc<IoState>);
26
27pub(crate) struct IoState {
28 filter: FilterPtr,
29 pub(super) id: Cell<Id>,
30 pub(super) cfg: Cfg<IoConfig>,
31 pub(super) flags: Flags,
32 pub(super) error: Cell<Option<io::Error>>,
33 pub(super) read_task: LocalWaker,
34 pub(super) write_task: LocalWaker,
35 dispatch_task: LocalWaker,
36 pub(super) buffer: Stack,
37 pub(super) handle: Cell<Option<Box<dyn Handle>>>,
38 pub(super) timeout: Cell<TimerHandle>,
39 pub(super) shutdown_timeout: Cell<Option<Sleep>>,
40 #[allow(clippy::box_collection)]
41 pub(super) on_disconnect: Cell<Option<Box<Vec<LocalWaker>>>>,
42}
43
44impl IoState {
45 pub(super) fn id(&self) -> Id {
46 self.id.get()
47 }
48
49 pub(super) fn tag(&self) -> &'static str {
50 self.cfg.tag()
51 }
52
53 pub(super) fn filter(&self) -> &dyn Filter {
54 self.filter.get()
55 }
56
57 pub(super) fn notify_timeout(&self) {
58 if self.flags.check_dispatcher_timeout_unset() {
59 self.wake_dispatch_task();
60 log::trace!("{}: Timer, notify dispatcher", self.cfg.tag());
61 }
62 }
63
64 pub(super) fn notify_disconnect(&self) {
65 if let Some(on_disconnect) = self.on_disconnect.take() {
66 for item in on_disconnect.into_iter() {
67 item.wake();
68 }
69 }
70 }
71
72 pub(super) fn error(&self) -> Option<io::Error> {
74 if let Some(err) = self.error.take() {
75 self.error
76 .set(Some(io::Error::new(err.kind(), format!("{err}"))));
77 Some(err)
78 } else {
79 None
80 }
81 }
82
83 pub(super) fn error_or_disconnected(&self) -> io::Error {
85 self.error()
86 .unwrap_or_else(|| io::Error::new(io::ErrorKind::NotConnected, "Disconnected"))
87 }
88
89 pub(super) fn filters_stopped(&self) {
90 self.wake_read_task();
91 self.wake_write_task();
92 self.wake_dispatch_task();
93 self.flags.set_filters_stopped();
94 }
95
96 pub(super) fn terminate_connection(&self, err: Option<io::Error>) {
97 if !self.flags.is_terminated() {
98 log::trace!("{}: Terminate io with error {:?}", self.cfg.tag(), err);
99 if err.is_some() {
100 self.error.set(err);
101 }
102 self.flags.set_terminate();
103 self.wake_read_task();
104 self.wake_write_task();
105 self.wake_dispatch_task();
106 self.notify_disconnect();
107 self.handle.take();
108 }
109 }
110
111 pub(super) fn start_shutdown(&self) {
113 if !self.flags.is_stopping_any() {
114 log::trace!("{}: Initiate io shutdown {:?}", self.cfg.tag(), self.flags);
115 self.flags.set_filter_stopping();
116 self.wake_read_task();
117 self.wake_write_task();
118 }
119 }
120
121 pub(super) fn get_read_buf(&self) -> BytesMut {
122 self.cfg.read_buf().get()
123 }
124
125 pub(super) fn is_rd_backpressure_needed(&self, size: usize) -> bool {
126 size >= self.cfg.read_buf().high
127 }
128
129 pub(super) fn is_wr_backpressure_needed(&self, size: usize) -> bool {
130 size >= self.cfg.write_buf().high
131 }
132
133 pub(super) fn should_disable_wr_backpressure(&self, size: usize) -> bool {
134 size <= self.cfg.write_buf().half
135 }
136
137 pub(super) fn wake_read_task(&self) {
138 self.read_task.wake();
139 }
140
141 pub(super) fn wake_write_task(&self) {
142 #[cfg(feature = "trace")]
143 log::trace!("{}: Wake write task, flags:{:?}", self.tag(), self.flags);
144 self.write_task.wake();
145 }
146
147 pub(super) fn wake_dispatch_task(&self) {
148 self.dispatch_task.wake();
149 }
150}
151
152impl Eq for IoState {}
153
154impl PartialEq for IoState {
155 #[inline]
156 fn eq(&self, other: &Self) -> bool {
157 ptr::eq(self, other)
158 }
159}
160
161impl hash::Hash for IoState {
162 #[inline]
163 fn hash<H: hash::Hasher>(&self, state: &mut H) {
164 (ptr::from_ref(self) as usize).hash(state);
165 }
166}
167
168impl fmt::Debug for IoState {
169 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
170 let err = self.error.take();
171 let res = f
172 .debug_struct("IoState")
173 .field("id", &self.id)
174 .field("flags", &self.flags)
175 .field("filter", &self.filter.is_set())
176 .field("timeout", &self.timeout)
177 .field("error", &err)
178 .field("buffer", &self.buffer)
179 .field("cfg", &self.cfg)
180 .finish();
181 self.error.set(err);
182 res
183 }
184}
185
186impl Io {
187 pub fn new<I: IoStream, T: Into<SharedCfg>>(io: I, cfg: T) -> Self {
189 let cfg = cfg.into().get::<IoConfig>();
190 let size = cfg.write_page_size();
191 let flags = Flags::new(cfg.write_buf_threshold() > 0);
192
193 let inner = Rc::new(IoState {
194 cfg,
195 flags,
196 id: Cell::new(Id::default()),
197 filter: FilterPtr::null(),
198 error: Cell::new(None),
199 dispatch_task: LocalWaker::new(),
200 read_task: LocalWaker::new(),
201 write_task: LocalWaker::new(),
202 buffer: Stack::new(size),
203 handle: Cell::new(None),
204 timeout: Cell::new(TimerHandle::default()),
205 shutdown_timeout: Cell::new(None),
206 on_disconnect: Cell::new(None),
207 });
208 inner.filter.set(Base::new(IoRef(inner.clone())));
209
210 let ioref = IoRef(inner);
211 ioref.0.id.set(IoManager::register(&ioref));
212
213 let hnd = io.start(IoContext::new(ioref.clone()));
215 ioref.0.handle.set(Some(hnd));
216
217 Io(UnsafeCell::new(ioref), marker::PhantomData)
218 }
219}
220
221impl<I: IoStream> From<I> for Io {
222 #[inline]
223 fn from(io: I) -> Io {
224 Io::new(io, SharedCfg::default())
225 }
226}
227
228impl IoRef {
229 fn create_empty() -> IoRef {
230 IoRef(Rc::new(IoState {
231 id: Cell::new(Id::default()),
232 cfg: SharedCfg::default().get::<IoConfig>(),
233 filter: FilterPtr::null(),
234 flags: Flags::new_stopped(),
235 error: Cell::new(None),
236 dispatch_task: LocalWaker::new(),
237 read_task: LocalWaker::new(),
238 write_task: LocalWaker::new(),
239 buffer: Stack::new(BytePageSize::Size16),
240 handle: Cell::new(None),
241 timeout: Cell::new(TimerHandle::default()),
242 shutdown_timeout: Cell::new(None),
243 on_disconnect: Cell::new(None),
244 }))
245 }
246}
247
248impl<F> Io<F> {
249 #[inline]
250 pub fn get_ref(&self) -> IoRef {
252 self.io_ref().clone()
253 }
254
255 #[inline]
256 #[must_use]
257 pub fn take(&self) -> Self {
261 Self(UnsafeCell::new(self.take_io_ref()), marker::PhantomData)
262 }
263
264 fn take_io_ref(&self) -> IoRef {
265 unsafe { mem::replace(&mut *self.0.get(), IoRef::create_empty()) }
266 }
267
268 fn st(&self) -> &IoState {
269 unsafe { &(*self.0.get()).0 }
270 }
271
272 fn io_ref(&self) -> &IoRef {
273 unsafe { &*self.0.get() }
274 }
275
276 #[inline]
277 pub fn set_config<T: Into<SharedCfg>>(&self, cfg: T) {
279 unsafe {
280 let cfg = cfg.into().get::<IoConfig>();
281 self.st().buffer.set_page_size(cfg.write_page_size());
282 self.st().cfg.replace(cfg);
283 }
284 }
285}
286
287impl<F: FilterLayer, T: Filter> Io<Layer<F, T>> {
288 #[inline]
289 pub fn filter(&self) -> &F {
291 &self.st().filter.filter::<Layer<F, T>>().0
292 }
293}
294
295impl<F: Filter> Io<F> {
296 #[inline]
297 pub fn seal(self) -> Io<Sealed> {
299 let state = self.take_io_ref();
300 state.0.filter.seal::<F>();
301
302 Io(UnsafeCell::new(state), marker::PhantomData)
303 }
304
305 #[inline]
306 pub fn boxed(self) -> IoBoxed {
308 self.seal().into()
309 }
310
311 #[inline]
312 pub fn add_filter<U>(self, nf: U) -> Io<Layer<U, F>>
314 where
315 U: FilterLayer,
316 {
317 if let Err(e) = self.st().buffer.process_write_buf(&self) {
320 self.st().terminate_connection(Some(e));
321 }
322
323 let state = self.take_io_ref();
324
325 unsafe { &mut *(Rc::as_ptr(&state.0).cast_mut()) }
330 .buffer
331 .add_layer(self.st().cfg.write_page_size());
332
333 state.0.filter.add_filter::<F, U>(nf);
335
336 if let Err(e) = self.st().buffer.process_read_buf(&self, 0) {
337 self.st().terminate_connection(Some(e));
338 }
339
340 Io(UnsafeCell::new(state), marker::PhantomData)
341 }
342
343 pub fn map_filter<U, R>(self, f: U) -> Io<R>
345 where
346 U: FnOnce(F) -> R,
347 R: Filter,
348 {
349 if let Err(e) = self.st().buffer.process_write_buf(&self) {
352 self.st().terminate_connection(Some(e));
353 }
354
355 let state = self.take_io_ref();
356 state.0.filter.map_filter::<F, U, R>(f);
357
358 Io(UnsafeCell::new(state), marker::PhantomData)
359 }
360}
361
362impl<F> Io<F> {
363 pub async fn recv<U>(
365 &self,
366 codec: &U,
367 ) -> Result<Option<U::Item>, Either<U::Error, io::Error>>
368 where
369 U: Decoder,
370 {
371 loop {
372 return match poll_fn(|cx| self.poll_recv(codec, cx)).await {
373 Ok(item) => Ok(Some(item)),
374 Err(RecvError::KeepAlive) => Err(Either::Right(io::Error::new(
375 io::ErrorKind::TimedOut,
376 "Timeout",
377 ))),
378 Err(RecvError::WriteBackpressure) => {
379 poll_fn(|cx| self.poll_flush(cx, false))
380 .await
381 .map_err(Either::Right)?;
382 continue;
383 }
384 Err(RecvError::Decoder(err)) => Err(Either::Left(err)),
385 Err(RecvError::PeerGone(Some(err))) => Err(Either::Right(err)),
386 Err(RecvError::PeerGone(None)) => Ok(None),
387 };
388 }
389 }
390
391 pub async fn read(&self, dst: &mut [u8]) -> io::Result<()> {
395 loop {
396 let completed = self.with_read_buf(|buf| {
397 if buf.len() >= dst.len() {
398 let _ = io::Read::read(buf, dst).expect("Cannot fail");
399 true
400 } else {
401 false
402 }
403 });
404 if completed {
405 return Ok(());
406 }
407 self.read_ready().await?;
408 }
409 }
410
411 #[inline]
412 pub async fn read_ready(&self) -> io::Result<Option<()>> {
414 poll_fn(|cx| self.poll_read_ready(cx)).await
415 }
416
417 #[inline]
418 pub async fn read_notify(&self) -> io::Result<Option<()>> {
420 poll_fn(|cx| self.poll_read_notify(cx)).await
421 }
422
423 #[inline]
424 pub fn pause(&self) {
426 let st = self.st();
427 if !st.flags.is_read_paused() {
428 st.wake_read_task();
429 st.flags.set_read_paused();
430 }
431 }
432
433 #[inline]
434 pub async fn send<U>(
436 &self,
437 item: U::Item,
438 codec: &U,
439 ) -> Result<(), Either<U::Error, io::Error>>
440 where
441 U: Encoder,
442 {
443 self.encode(item, codec).map_err(Either::Left)?;
444
445 poll_fn(|cx| self.poll_flush(cx, true))
446 .await
447 .map_err(Either::Right)?;
448
449 Ok(())
450 }
451
452 #[inline]
453 pub async fn flush(&self, full: bool) -> io::Result<()> {
457 poll_fn(|cx| self.poll_flush(cx, full)).await
458 }
459
460 #[inline]
461 pub async fn shutdown(&self) -> io::Result<()> {
463 poll_fn(|cx| self.poll_shutdown(cx)).await
464 }
465
466 #[inline]
467 pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Option<()>>> {
482 let st = self.st();
483
484 if st.flags.is_closed() {
485 Poll::Ready(Ok(None))
486 } else {
487 let ready = st.flags.is_read_ready();
488
489 if st.flags.is_read_paused_or_backpressure() {
492 st.flags.unset_all_read_flags();
493 st.flags.unset_read_paused();
494 st.wake_read_task();
495 if ready {
496 Poll::Ready(Ok(Some(())))
497 } else {
498 st.dispatch_task.register(cx.waker());
499 Poll::Pending
500 }
501 } else if ready {
502 Poll::Ready(Ok(Some(())))
503 } else {
504 if st.flags.is_read_paused() {
505 st.wake_read_task();
506 st.flags.unset_read_paused();
507 }
508 st.dispatch_task.register(cx.waker());
509 Poll::Pending
510 }
511 }
512 }
513
514 #[inline]
515 pub fn poll_read_notify(&self, cx: &mut Context<'_>) -> Poll<io::Result<Option<()>>> {
517 let st = self.st();
518 if st.flags.is_stopping() {
519 Poll::Ready(Ok(None))
520 } else if st.flags.check_read_notifed() {
521 Poll::Ready(Ok(Some(())))
522 } else {
523 st.flags.set_read_notify();
524 if self.poll_read_ready(cx).is_ready() {
525 st.dispatch_task.register(cx.waker());
526 }
527 st.dispatch_task.register(cx.waker());
528 Poll::Pending
529 }
530 }
531
532 #[inline]
533 pub fn poll_recv<U>(
538 &self,
539 codec: &U,
540 cx: &mut Context<'_>,
541 ) -> Poll<Result<U::Item, RecvError<U>>>
542 where
543 U: Decoder,
544 {
545 let decoded = self.poll_recv_decode(codec, cx)?;
546
547 if let Some(item) = decoded.item {
548 Poll::Ready(Ok(item))
549 } else {
550 Poll::Pending
551 }
552 }
553
554 #[inline]
555 pub fn poll_recv_decode<U>(
560 &self,
561 codec: &U,
562 cx: &mut Context<'_>,
563 ) -> Result<Decoded<U::Item>, RecvError<U>>
564 where
565 U: Decoder,
566 {
567 let st = self.st();
568 st.flags.unset_read_ready();
569
570 let decoded = self
571 .decode_item(codec)
572 .map_err(|err| RecvError::Decoder(err))?;
573
574 if decoded.item.is_some() {
575 Ok(decoded)
576 } else if st.flags.is_stopping() {
577 Err(RecvError::PeerGone(st.error()))
578 } else if st.flags.check_dispatcher_timeout() {
579 Err(RecvError::KeepAlive)
580 } else if st.flags.is_wr_backpressure() {
581 Err(RecvError::WriteBackpressure)
582 } else {
583 match self.poll_read_ready(cx) {
584 Poll::Pending | Poll::Ready(Ok(Some(()))) => {
585 #[cfg(feature = "trace")]
586 if decoded.remains != 0 {
587 log::trace!("{}: Not enough data to decode next frame", self.tag());
588 }
589 Ok(decoded)
590 }
591 Poll::Ready(Err(e)) => Err(RecvError::PeerGone(Some(e))),
592 Poll::Ready(Ok(None)) => Err(RecvError::PeerGone(None)),
593 }
594 }
595 }
596
597 #[inline]
598 pub fn poll_flush(&self, cx: &mut Context<'_>, full: bool) -> Poll<io::Result<()>> {
603 let st = self.st();
604
605 st.buffer.process_write_buf_force(self)?;
607 self.consolidate_write_state(false);
608
609 let len = st.buffer.write_buf_size();
610 if len > 0 {
611 if st.flags.is_closed() {
612 return Poll::Ready(Err(st.error_or_disconnected()));
613 } else if full {
614 st.flags.set_wants_write_flush();
615 st.dispatch_task.register(cx.waker());
616 return Poll::Pending;
617 } else if st.is_wr_backpressure_needed(len) {
618 st.flags.set_wr_backpressure();
619 st.dispatch_task.register(cx.waker());
620 return Poll::Pending;
621 }
622 }
623 if st.flags.is_closed() && !st.flags.is_write_flush() {
624 Poll::Ready(Err(st.error_or_disconnected()))
625 } else {
626 st.flags.unset_wr_backpressure_and_flush();
627 Poll::Ready(Ok(()))
628 }
629 }
630
631 #[inline]
632 pub fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
634 let st = self.st();
635
636 if st.flags.is_stopping() {
637 if let Some(err) = st.error() {
638 Poll::Ready(Err(err))
639 } else {
640 Poll::Ready(Ok(()))
641 }
642 } else {
643 if !st.flags.is_stopping_filters() {
644 st.start_shutdown();
645 }
646 st.flags.unset_all_read_flags();
647 st.flags.unset_read_paused();
648
649 st.wake_read_task();
650 st.dispatch_task.register(cx.waker());
651 Poll::Pending
652 }
653 }
654
655 #[inline]
656 pub fn poll_read_pause(&self, cx: &mut Context<'_>) -> Poll<IoStatusUpdate> {
660 self.pause();
661 self.poll_status_update(cx)
662 }
663
664 #[inline]
665 pub fn poll_status_update(&self, cx: &mut Context<'_>) -> Poll<IoStatusUpdate> {
667 let st = self.st();
668 st.dispatch_task.register(cx.waker());
669 if st.flags.is_closed() {
670 Poll::Ready(IoStatusUpdate::PeerGone(st.error()))
671 } else if st.flags.check_dispatcher_timeout() {
672 Poll::Ready(IoStatusUpdate::KeepAlive)
673 } else if st.flags.is_wr_backpressure() {
674 if st.should_disable_wr_backpressure(st.buffer.write_buf_size()) {
676 st.flags.unset_wr_backpressure();
677 }
678 Poll::Ready(IoStatusUpdate::WriteBackpressure)
679 } else {
680 Poll::Pending
681 }
682 }
683
684 #[inline]
685 pub fn poll_dispatch(&self, cx: &mut Context<'_>) {
687 self.st().dispatch_task.register(cx.waker());
688 }
689}
690
691impl<F> AsRef<IoRef> for Io<F> {
692 #[inline]
693 fn as_ref(&self) -> &IoRef {
694 self.io_ref()
695 }
696}
697
698impl<F> Eq for Io<F> {}
699
700impl<F> PartialEq for Io<F> {
701 #[inline]
702 fn eq(&self, other: &Self) -> bool {
703 self.io_ref().eq(other.io_ref())
704 }
705}
706
707impl<F> hash::Hash for Io<F> {
708 #[inline]
709 fn hash<H: hash::Hasher>(&self, state: &mut H) {
710 self.io_ref().hash(state);
711 }
712}
713
714impl<F> fmt::Debug for Io<F> {
715 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
716 f.debug_struct("Io").field("state", self.st()).finish()
717 }
718}
719
720impl<F> ops::Deref for Io<F> {
721 type Target = IoRef;
722
723 #[inline]
724 fn deref(&self) -> &Self::Target {
725 self.io_ref()
726 }
727}
728
729impl<F> Drop for Io<F> {
730 fn drop(&mut self) {
731 let st = self.st();
732 self.stop_timer();
733
734 if st.filter.is_set() {
735 if !st.flags.is_terminated() {
738 log::trace!("{}: Io is dropped, terminate connection", st.tag());
739 }
740
741 st.terminate_connection(None);
742 st.filter.drop_filter::<F>();
743 }
744
745 IoManager::unregister(self.io_ref());
746 }
747}
748
749#[derive(Debug)]
750#[must_use = "OnDisconnect do nothing unless polled"]
752pub struct OnDisconnect {
753 token: usize,
754 inner: Rc<IoState>,
755}
756
757impl OnDisconnect {
758 pub(super) fn new(inner: Rc<IoState>) -> Self {
759 Self::new_inner(inner.flags.is_stopping(), inner)
760 }
761
762 fn new_inner(disconnected: bool, inner: Rc<IoState>) -> Self {
763 let token = if disconnected {
764 usize::MAX
765 } else {
766 let mut on_disconnect = inner.on_disconnect.take();
767 let token = if let Some(ref mut on_disconnect) = on_disconnect {
768 let token = on_disconnect.len();
769 on_disconnect.push(LocalWaker::default());
770 token
771 } else {
772 on_disconnect = Some(Box::new(vec![LocalWaker::default()]));
773 0
774 };
775 inner.on_disconnect.set(on_disconnect);
776 token
777 };
778 Self { token, inner }
779 }
780
781 #[inline]
782 pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<()> {
784 if self.token == usize::MAX || self.inner.flags.is_stopping() {
785 Poll::Ready(())
786 } else if let Some(on_disconnect) = self.inner.on_disconnect.take() {
787 on_disconnect[self.token].register(cx.waker());
788 self.inner.on_disconnect.set(Some(on_disconnect));
789 Poll::Pending
790 } else {
791 Poll::Ready(())
792 }
793 }
794}
795
796impl Clone for OnDisconnect {
797 fn clone(&self) -> Self {
798 if self.token == usize::MAX {
799 OnDisconnect::new_inner(true, self.inner.clone())
800 } else {
801 OnDisconnect::new_inner(false, self.inner.clone())
802 }
803 }
804}
805
806impl Future for OnDisconnect {
807 type Output = ();
808
809 #[inline]
810 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
811 self.poll_ready(cx)
812 }
813}
814
815#[cfg(test)]
816mod tests {
817 use ntex_bytes::{BufMut, BytePages, Bytes, BytesMut};
818 use ntex_codec::BytesCodec;
819 use ntex_util::{future::lazy, time::Millis, time::sleep};
820
821 use super::*;
822 use crate::{
823 FilterBuf, IoContext, IoTaskStatus, Readiness, ops::Iops, testing::IoTest,
824 };
825
826 const BIN: &[u8] = b"GET /test HTTP/1\r\n\r\n";
827 const TEXT: &str = "GET /test HTTP/1\r\n\r\n";
828 const BIN2: &[u8] = b"12345678901234561234567890123456";
829
830 #[ntex::test]
831 async fn test_basics() {
832 let (client, server) = IoTest::create();
833 client.remote_buffer_cap(1024);
834
835 let server = Io::from(server);
836 assert!(server.eq(&server));
837 assert!(server.io_ref().eq(server.io_ref()));
838 }
839
840 #[ntex::test]
841 async fn test_recv() {
842 let (client, server) = IoTest::create();
843 client.remote_buffer_cap(1024);
844
845 let server = Io::new(server, SharedCfg::new("SRV"));
846
847 server.st().notify_timeout();
848 let err = server.recv(&BytesCodec).await.err().unwrap();
849 assert!(format!("{err:?}").contains("Timeout"));
850
851 client.write(TEXT);
852 server.st().flags.set_wr_backpressure();
853 let item = server.recv(&BytesCodec).await.ok().unwrap().unwrap();
854 assert_eq!(item, TEXT);
855 }
856
857 #[ntex::test]
858 async fn test_send() {
859 let (client, server) = IoTest::create();
860 client.remote_buffer_cap(1024);
861
862 let server = Io::from(server);
863 assert!(server.eq(&server));
864
865 server
866 .send(Bytes::from_static(BIN), &BytesCodec)
867 .await
868 .ok()
869 .unwrap();
870 let item = client.read_any();
871 assert_eq!(item, TEXT);
872 }
873
874 #[ntex::test]
875 async fn read() {
876 let io = Io::new(
877 IoTest::create().0,
878 SharedCfg::new("SRV").add(IoConfig::default().set_read_buf(8, 4, 16)),
879 );
880 assert!(lazy(|cx| io.poll_read_ready(cx)).await.is_pending());
881 assert!(io.st().dispatch_task.is_set());
882
883 let ctx = IoContext::new(io.get_ref());
884
885 assert_eq!(
887 lazy(|cx| ctx.poll_read_ready(cx)).await,
888 Poll::Ready(Readiness::Ready)
889 );
890 assert!(io.st().read_task.is_set());
891 assert!(!io.st().flags.is_read_ready());
892 assert!(!io.st().flags.is_rd_backpressure());
893
894 ctx.update_read_status(BytesMut::copy_from_slice(b"1234567890"), Ok(10));
896
897 assert!(!io.st().dispatch_task.is_set());
899 assert!(io.st().flags.is_read_paused());
901 assert!(io.st().flags.is_read_ready());
903 assert!(io.st().flags.is_rd_backpressure());
905 assert_eq!(lazy(|cx| ctx.poll_read_ready(cx)).await, Poll::Pending);
907
908 assert_eq!(io.with_read_buf(|buf| buf.split_to(1)), b"1");
910 assert!(io.st().flags.is_read_ready());
912 assert!(io.st().flags.is_rd_backpressure());
914
915 assert!(io.st().read_task.is_set());
917
918 assert_eq!(io.with_read_buf(|buf| buf.split_to(1)), b"2");
920 assert!(io.st().flags.is_rd_backpressure());
922
923 assert_eq!(io.with_read_buf(|buf| buf.split_to(4)), b"3456");
925 assert!(!io.st().flags.is_read_paused());
927 assert!(!io.st().flags.is_read_ready());
929 assert!(!io.st().flags.is_rd_backpressure());
931 assert!(!io.st().read_task.is_set());
933 assert_eq!(
934 lazy(|cx| ctx.poll_read_ready(cx)).await,
935 Poll::Ready(Readiness::Ready)
936 );
937
938 lazy(|cx| io.poll_dispatch(cx)).await;
940
941 ctx.update_read_status(BytesMut::copy_from_slice(b"1234"), Ok(4));
943
944 assert!(!io.st().dispatch_task.is_set());
946 assert!(io.st().flags.is_read_paused());
948 assert!(io.st().flags.is_read_ready());
950 assert!(io.st().flags.is_rd_backpressure());
952 assert_eq!(lazy(|cx| ctx.poll_read_ready(cx)).await, Poll::Pending);
954
955 assert_eq!(io.with_read_buf(|buf| buf.split_to(4)), b"7890");
957 assert!(!io.st().flags.is_rd_backpressure());
959
960 lazy(|cx| io.poll_dispatch(cx)).await;
962
963 ctx.update_read_status(BytesMut::copy_from_slice(b"567"), Ok(3));
965
966 assert!(!io.st().flags.is_read_paused());
968 assert!(io.st().flags.is_read_ready());
970 assert!(!io.st().flags.is_rd_backpressure());
972 assert_eq!(
974 lazy(|cx| ctx.poll_read_ready(cx)).await,
975 Poll::Ready(Readiness::Ready)
976 );
977
978 assert_eq!(io.with_read_buf(BytesMut::take), b"1234567");
980 assert!(!io.st().flags.is_read_paused());
982 assert!(!io.st().flags.is_read_ready());
984 assert!(io.st().read_task.is_set());
986
987 io.terminate();
989 assert!(!io.st().read_task.is_set());
991 assert_eq!(
993 lazy(|cx| ctx.poll_read_ready(cx)).await,
994 Poll::Ready(Readiness::Terminate)
995 );
996 }
997
998 #[ntex::test]
999 async fn read_notify() {
1000 let io = Io::new(
1001 IoTest::create().0,
1002 SharedCfg::new("SRV").add(IoConfig::default().set_read_buf(8, 4, 16)),
1003 );
1004 assert!(!io.st().flags.is_read_notify());
1005 assert!(lazy(|cx| io.poll_read_notify(cx)).await.is_pending());
1006 assert!(io.st().dispatch_task.is_set());
1007 assert!(io.st().flags.is_read_notify());
1008
1009 let ctx = IoContext::new(io.get_ref());
1010
1011 ctx.update_read_status(BytesMut::copy_from_slice(b"1"), Ok(1));
1013
1014 assert!(!io.st().dispatch_task.is_set());
1015 assert!(io.st().flags.is_read_ready());
1017 assert!(io.st().flags.is_read_notify());
1018 assert!(io.st().flags.is_read_notified());
1020 let res = lazy(|cx| io.poll_read_notify(cx)).await;
1021 assert!(matches!(res, Poll::Ready(Ok(Some(())))));
1022
1023 assert!(!io.st().dispatch_task.is_set());
1025 assert!(io.st().flags.is_read_ready());
1027
1028 assert!(lazy(|cx| io.poll_read_notify(cx)).await.is_pending());
1030 assert!(io.st().dispatch_task.is_set());
1031 assert!(io.st().flags.is_read_notify());
1032 assert!(io.st().flags.is_read_ready());
1033 assert_eq!(
1035 lazy(|cx| ctx.poll_read_ready(cx)).await,
1036 Poll::Ready(Readiness::Ready)
1037 );
1038
1039 ctx.update_read_status(BytesMut::copy_from_slice(b"2345678"), Ok(7));
1041 assert!(io.st().flags.is_rd_backpressure());
1043
1044 assert!(io.st().flags.is_read_ready());
1046 assert!(io.st().flags.is_read_notify());
1047 assert!(io.st().flags.is_read_notified());
1049 let res = lazy(|cx| io.poll_read_notify(cx)).await;
1050 assert!(matches!(res, Poll::Ready(Ok(Some(())))));
1051 assert_eq!(lazy(|cx| ctx.poll_read_ready(cx)).await, Poll::Pending);
1053 assert!(io.st().read_task.is_set());
1055
1056 assert!(lazy(|cx| io.poll_read_notify(cx)).await.is_pending());
1058 assert!(!io.st().flags.is_rd_backpressure());
1060 assert!(!io.st().flags.is_read_ready());
1061 assert!(!io.st().flags.is_read_paused());
1062 assert!(!io.st().read_task.is_set());
1064 assert_eq!(
1066 lazy(|cx| ctx.poll_read_ready(cx)).await,
1067 Poll::Ready(Readiness::Ready)
1068 );
1069
1070 ctx.update_read_status(BytesMut::copy_from_slice(b"1"), Ok(1));
1072 assert!(!io.st().dispatch_task.is_set());
1073 assert!(io.st().flags.is_read_ready());
1075 assert!(io.st().flags.is_read_notify());
1076 assert!(io.st().flags.is_read_paused());
1077 assert!(io.st().flags.is_rd_backpressure());
1078 assert!(io.st().flags.is_read_notified());
1080 assert!(matches!(
1081 lazy(|cx| io.poll_read_notify(cx)).await,
1082 Poll::Ready(Ok(Some(())))
1083 ));
1084
1085 io.terminate();
1087 let res = lazy(|cx| io.poll_read_notify(cx)).await;
1088 assert!(matches!(res, Poll::Ready(Ok(None))), "{res:?}");
1089 }
1090
1091 #[ntex::test]
1092 async fn read_readiness() {
1093 let (client, server) = IoTest::create();
1094 client.remote_buffer_cap(1024);
1095
1096 let io = Io::from(server);
1097 assert!(lazy(|cx| io.poll_read_ready(cx)).await.is_pending());
1098
1099 client.write(TEXT);
1100 assert_eq!(io.read_ready().await.unwrap(), Some(()));
1101 assert!(matches!(
1102 lazy(|cx| io.poll_read_ready(cx)).await,
1103 Poll::Ready(Ok(Some(())))
1104 ));
1105
1106 let item = io.with_read_buf(BytesMut::take);
1107 assert_eq!(item, Bytes::from_static(BIN));
1108
1109 client.write(TEXT);
1110 sleep(Millis(50)).await;
1111 assert!(lazy(|cx| io.poll_read_ready(cx)).await.is_ready());
1112 assert!(lazy(|cx| io.poll_read_ready(cx)).await.is_ready());
1113 }
1114
1115 #[ntex::test]
1116 async fn read_backpressure() {
1117 let (client, server) = IoTest::create();
1118
1119 let io = Io::new(
1120 server,
1121 SharedCfg::new("SRV").add(IoConfig::default().set_read_buf(64, 32, 12)),
1122 );
1123 assert!(lazy(|cx| io.poll_read_ready(cx)).await.is_pending());
1124
1125 client.write(BIN2);
1126 client.write(BIN2);
1127 sleep(Millis(50)).await;
1128 assert!(io.flags().is_read_ready());
1129 assert!(io.flags().is_rd_backpressure());
1130 let _item = io.recv(&BytesCodec).await.ok().unwrap().unwrap();
1131 assert!(!io.flags().is_read_ready());
1132 assert!(!io.flags().is_rd_backpressure());
1133
1134 client.write(BIN2);
1135 client.write(BIN2);
1136 sleep(Millis(50)).await;
1137 assert!(io.flags().is_read_ready());
1138 assert!(io.flags().is_rd_backpressure());
1139 assert_eq!(io.read_ready().await.unwrap(), Some(()));
1140 }
1141
1142 #[ntex::test]
1143 async fn write() {
1144 let io = Io::new(
1145 IoTest::create().0,
1146 SharedCfg::new("SRV").add(IoConfig::default().set_write_buf(8, 4, 16)),
1147 );
1148 assert!(lazy(|cx| io.poll_status_update(cx)).await.is_pending());
1149 assert!(io.st().dispatch_task.is_set());
1150 assert!(io.st().flags.is_direct_wr_enabled());
1151
1152 let ctx = IoContext::new(io.get_ref());
1153
1154 assert_eq!(lazy(|cx| ctx.poll_write_ready(cx)).await, Poll::Pending);
1156 assert!(io.st().write_task.is_set());
1157 assert!(io.st().flags.is_write_paused());
1158 assert!(!io.st().flags.is_wr_backpressure());
1159
1160 io.with_write_buf(|buf| buf.put_slice(b"1234")).unwrap();
1162 assert_eq!(lazy(|cx| ctx.poll_write_ready(cx)).await, Poll::Pending);
1163 assert!(io.st().flags.is_write_paused());
1165 assert!(io.st().flags.is_wr_send_scheduled());
1167 assert!(!io.st().flags.is_wr_backpressure());
1169 assert!(io.st().dispatch_task.is_set());
1171
1172 io.with_write_buf(|buf| buf.put_slice(b"5678")).unwrap();
1174 assert!(io.st().flags.is_wr_backpressure());
1176 assert!(!io.st().dispatch_task.is_set());
1178 assert!(io.st().write_task.is_set());
1180 assert!(matches!(
1182 lazy(|cx| io.poll_status_update(cx)).await,
1183 Poll::Ready(IoStatusUpdate::WriteBackpressure)
1184 ));
1185 assert!(lazy(|cx| io.poll_flush(cx, false)).await.is_pending());
1187 assert!(!io.st().flags.is_write_flush());
1189
1190 Iops::run();
1192 assert!(!io.st().flags.is_wr_send_scheduled());
1194 assert!(!io.st().flags.is_write_paused());
1196 assert!(!io.st().write_task.is_set());
1198 assert_eq!(
1200 lazy(|cx| ctx.poll_write_ready(cx)).await,
1201 Poll::Ready(Readiness::Ready)
1202 );
1203
1204 assert_eq!(ctx.with_write_buf(|buf| buf.split_to(4).freeze()), b"1234");
1206 assert_eq!(ctx.update_write_status(Ok(true)), IoTaskStatus::Io);
1208 assert_eq!(
1210 lazy(|cx| ctx.poll_write_ready(cx)).await,
1211 Poll::Ready(Readiness::Ready)
1212 );
1213 assert!(!io.st().flags.is_write_paused());
1215 assert!(io.st().flags.is_wr_backpressure());
1217 assert!(matches!(
1219 lazy(|cx| io.poll_status_update(cx)).await,
1220 Poll::Ready(IoStatusUpdate::WriteBackpressure)
1221 ));
1222 assert!(!io.st().flags.is_wr_backpressure());
1224 assert!(lazy(|cx| io.poll_status_update(cx)).await.is_pending());
1225 assert!(matches!(
1227 lazy(|cx| io.poll_flush(cx, false)).await,
1228 Poll::Ready(Ok(()))
1229 ));
1230
1231 io.with_write_buf(|buf| buf.put_slice(b"1234")).unwrap();
1233 assert!(lazy(|cx| io.poll_flush(cx, true)).await.is_pending());
1234 assert!(io.st().flags.is_write_flush());
1236 assert!(io.st().flags.is_wr_backpressure());
1238
1239 Iops::run();
1241 assert_eq!(ctx.with_write_buf(BytePages::freeze), b"56781234");
1242 assert!(!io.st().flags.is_wr_send_scheduled());
1244 assert_eq!(ctx.update_write_status(Ok(true)), IoTaskStatus::Pause);
1246 assert!(io.st().flags.is_write_paused());
1248 assert!(io.st().flags.is_write_flush());
1250 assert!(io.st().flags.is_wr_backpressure());
1252 assert!(!io.st().dispatch_task.is_set());
1254
1255 assert!(matches!(
1257 lazy(|cx| io.poll_flush(cx, false)).await,
1258 Poll::Ready(Ok(()))
1259 ));
1260 assert!(!io.st().flags.is_write_flush());
1262 assert!(!io.st().flags.is_wr_backpressure());
1264
1265 io.terminate();
1267 assert!(!io.st().write_task.is_set());
1269 assert_eq!(
1271 lazy(|cx| ctx.poll_write_ready(cx)).await,
1272 Poll::Ready(Readiness::Terminate)
1273 );
1274 let Poll::Ready(Err(err)) = lazy(|cx| io.poll_flush(cx, false)).await else {
1276 panic!()
1277 };
1278 assert_eq!(err.kind(), io::ErrorKind::NotConnected);
1279 assert!(matches!(
1281 lazy(|cx| io.poll_status_update(cx)).await,
1282 Poll::Ready(IoStatusUpdate::PeerGone(None))
1283 ));
1284 }
1285
1286 #[ntex::test]
1287 async fn write_backpressure() {
1288 let (client, server) = IoTest::create();
1289 client.remote_buffer_cap(0);
1290
1291 let io = Io::new(
1292 server,
1293 SharedCfg::new("SRV").add(IoConfig::default().set_write_buf(16, 8, 12)),
1294 );
1295 assert!(lazy(|cx| io.poll_read_ready(cx)).await.is_pending());
1296 assert!(io.flags().is_write_paused());
1297 assert!(!io.flags().is_wr_backpressure());
1298 assert!(!io.is_wr_backpressure());
1299
1300 io.encode_slice(BIN2).unwrap();
1301 assert!(Iops::is_registered(&io));
1302 assert!(io.flags().is_wr_backpressure());
1303
1304 client.remote_buffer_cap(1024);
1305 let item = client.read().await.unwrap();
1306 assert_eq!(item, BIN2);
1307 assert!(io.flags().is_wr_backpressure());
1308 assert!(matches!(
1309 lazy(|cx| io.poll_status_update(cx)).await,
1310 Poll::Ready(IoStatusUpdate::WriteBackpressure)
1311 ));
1312 assert!(!io.flags().is_wr_backpressure());
1313 assert!(matches!(
1314 lazy(|cx| io.poll_flush(cx, false)).await,
1315 Poll::Ready(Ok(()))
1316 ));
1317 assert!(!io.flags().is_wr_backpressure());
1318 }
1319
1320 #[ntex::test]
1321 async fn shutdown() {
1322 #[derive(Debug)]
1324 struct F;
1325
1326 impl FilterLayer for F {
1327 fn process_read_buf(&self, _: &FilterBuf<'_>) -> io::Result<()> {
1328 Ok(())
1329 }
1330 fn process_write_buf(&self, _: &FilterBuf<'_>) -> io::Result<()> {
1331 Ok(())
1332 }
1333 }
1334
1335 let io = Io::new(
1336 IoTest::create().0,
1337 SharedCfg::new("SRV").add(IoConfig::default().set_write_buf(8, 4, 16)),
1338 );
1339 let st = io.st();
1340 assert!(lazy(|cx| io.poll_status_update(cx)).await.is_pending());
1341 assert!(st.dispatch_task.is_set());
1342 assert!(!st.flags.is_closed());
1343 assert!(!st.flags.is_stopping_filters());
1344
1345 let ctx = IoContext::new(io.get_ref());
1346
1347 io.close();
1349 assert!(!st.flags.is_closed());
1350 assert!(st.flags.is_stopping_filters());
1351 let err = io.with_write_buf(|_| 1).unwrap_err();
1353 assert_eq!(err.kind(), io::ErrorKind::Other);
1354
1355 let io = io.add_filter(F);
1356 let layer = Layer::new(F, Base::new(io.get_ref()));
1357
1358 let st = io.st();
1359 st.buffer.with_write_src(|p| p.put_slice(b"123"));
1360 assert_eq!(st.buffer.write_buf_size(), 3);
1361 let res = st.buffer.with_filter(io.as_ref(), |f| layer.shutdown(f));
1362 assert!(matches!(res, Ok(Poll::Ready(()))));
1363 assert_eq!(st.buffer.write_buf_size(), 0);
1364
1365 ctx.stop(None);
1367 assert!(st.flags.is_closed());
1368 assert!(st.flags.is_terminated());
1369 assert!(st.flags.is_stopping_filters());
1370
1371 let err = io.with_write_buf(|_| 1).unwrap_err();
1372 assert_eq!(err.kind(), io::ErrorKind::NotConnected);
1373 }
1374}