1use std::cell::{Cell, UnsafeCell};
2use std::future::{Future, poll_fn};
3use std::task::{Context, Poll};
4use std::{fmt, hash, io, marker, mem, ops, pin::Pin, ptr, rc::Rc};
5
6use ntex_bytes::{BytePageSize, BytesMut};
7use ntex_codec::{Decoder, Encoder};
8use ntex_service::cfg::{Cfg, SharedCfg};
9use ntex_util::{future::Either, task::LocalWaker, time::Sleep};
10
11use crate::buf::Stack;
12use crate::cfg::IoConfig;
13use crate::ctx::IoContext;
14use crate::filter::{Base, Filter, Layer};
15use crate::filterptr::FilterPtr;
16use crate::flags::Flags;
17use crate::ops::{Id, IoManager, TimerHandle};
18use crate::seal::{IoBoxed, Sealed};
19use crate::{Decoded, FilterLayer, Handle, IoStatusUpdate, IoStream, RecvError};
20
21pub struct Io<F = Base>(UnsafeCell<IoRef>, marker::PhantomData<F>);
23
24#[derive(Clone)]
25pub struct IoRef(pub(super) Rc<IoState>);
26
27pub(crate) struct IoState {
28 filter: FilterPtr,
29 pub(super) id: Cell<Id>,
30 pub(super) cfg: Cfg<IoConfig>,
31 pub(super) flags: Flags,
32 pub(super) error: Cell<Option<io::Error>>,
33 pub(super) read_task: LocalWaker,
34 pub(super) write_task: LocalWaker,
35 dispatch_task: LocalWaker,
36 pub(super) buffer: Stack,
37 pub(super) handle: Cell<Option<Box<dyn Handle>>>,
38 pub(super) timeout: Cell<TimerHandle>,
39 pub(super) shutdown_timeout: Cell<Option<Sleep>>,
40 #[allow(clippy::box_collection)]
41 pub(super) on_disconnect: Cell<Option<Box<Vec<LocalWaker>>>>,
42}
43
44impl IoState {
45 pub(super) fn id(&self) -> Id {
46 self.id.get()
47 }
48
49 pub(super) fn tag(&self) -> &'static str {
50 self.cfg.tag()
51 }
52
53 pub(super) fn filter(&self) -> &dyn Filter {
54 self.filter.get()
55 }
56
57 pub(super) fn notify_timeout(&self) {
58 if self.flags.check_dispatcher_timeout_unset() {
59 self.wake_dispatch_task();
60 log::trace!("{}: Timer, notify dispatcher", self.cfg.tag());
61 }
62 }
63
64 pub(super) fn notify_disconnect(&self) {
65 if let Some(on_disconnect) = self.on_disconnect.take() {
66 for item in on_disconnect.into_iter() {
67 item.wake();
68 }
69 }
70 }
71
72 pub(super) fn error(&self) -> Option<io::Error> {
74 if let Some(err) = self.error.take() {
75 self.error
76 .set(Some(io::Error::new(err.kind(), format!("{err}"))));
77 Some(err)
78 } else {
79 None
80 }
81 }
82
83 pub(super) fn error_or_disconnected(&self) -> io::Error {
85 self.error()
86 .unwrap_or_else(|| io::Error::new(io::ErrorKind::NotConnected, "Disconnected"))
87 }
88
89 pub(super) fn filters_stopped(&self) {
90 self.wake_read_task();
91 self.wake_write_task();
92 self.wake_dispatch_task();
93 self.flags.set_filters_stopped();
94 }
95
96 pub(super) fn terminate_connection(&self, err: Option<io::Error>) {
97 if !self.flags.is_terminated() {
98 log::trace!("{}: Terminate io with error {:?}", self.cfg.tag(), err);
99 if err.is_some() {
100 self.error.set(err);
101 }
102 self.flags.set_terminate();
103 self.wake_read_task();
104 self.wake_write_task();
105 self.wake_dispatch_task();
106 self.notify_disconnect();
107 self.handle.take();
108 }
109 }
110
111 pub(super) fn start_shutdown(&self) {
113 if !self.flags.is_stopping_any() {
114 log::trace!("{}: Initiate io shutdown {:?}", self.cfg.tag(), self.flags);
115 self.flags.set_filter_stopping();
116 self.wake_read_task();
117 self.wake_write_task();
118 }
119 }
120
121 pub(super) fn get_read_buf(&self) -> BytesMut {
122 self.cfg.read_buf().get()
123 }
124
125 pub(super) fn is_rd_backpressure_needed(&self, size: usize) -> bool {
126 size >= self.cfg.read_buf().high
127 }
128
129 pub(super) fn is_wr_backpressure_needed(&self, size: usize) -> bool {
130 size >= self.cfg.write_buf().high
131 }
132
133 pub(super) fn should_disable_wr_backpressure(&self, size: usize) -> bool {
134 size <= self.cfg.write_buf().half
135 }
136
137 pub(super) fn wake_read_task(&self) {
138 self.read_task.wake();
139 }
140
141 pub(super) fn wake_write_task(&self) {
142 #[cfg(feature = "trace")]
143 log::trace!("{}: Wake write task, flags:{:?}", self.tag(), self.flags);
144 self.write_task.wake();
145 }
146
147 pub(super) fn wake_dispatch_task(&self) {
148 self.dispatch_task.wake();
149 }
150}
151
152impl Eq for IoState {}
153
154impl PartialEq for IoState {
155 #[inline]
156 fn eq(&self, other: &Self) -> bool {
157 ptr::eq(self, other)
158 }
159}
160
161impl hash::Hash for IoState {
162 #[inline]
163 fn hash<H: hash::Hasher>(&self, state: &mut H) {
164 (ptr::from_ref(self) as usize).hash(state);
165 }
166}
167
168impl fmt::Debug for IoState {
169 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
170 let err = self.error.take();
171 let res = f
172 .debug_struct("IoState")
173 .field("id", &self.id)
174 .field("flags", &self.flags)
175 .field("filter", &self.filter.is_set())
176 .field("timeout", &self.timeout)
177 .field("error", &err)
178 .field("buffer", &self.buffer)
179 .field("cfg", &self.cfg)
180 .finish();
181 self.error.set(err);
182 res
183 }
184}
185
186impl Io {
187 pub fn new<I: IoStream, T: Into<SharedCfg>>(io: I, cfg: T) -> Self {
189 let cfg = cfg.into().get::<IoConfig>();
190 let size = cfg.write_page_size();
191 let flags = Flags::new(cfg.write_buf_threshold() > 0);
192
193 let inner = Rc::new(IoState {
194 cfg,
195 flags,
196 id: Cell::new(Id::default()),
197 filter: FilterPtr::null(),
198 error: Cell::new(None),
199 dispatch_task: LocalWaker::new(),
200 read_task: LocalWaker::new(),
201 write_task: LocalWaker::new(),
202 buffer: Stack::new(size),
203 handle: Cell::new(None),
204 timeout: Cell::new(TimerHandle::default()),
205 shutdown_timeout: Cell::new(None),
206 on_disconnect: Cell::new(None),
207 });
208 inner.filter.set(Base::new(IoRef(inner.clone())));
209
210 let ioref = IoRef(inner);
211 ioref.0.id.set(IoManager::register(&ioref));
212
213 let hnd = io.start(IoContext::new(ioref.clone()));
215 ioref.0.handle.set(Some(hnd));
216
217 Io(UnsafeCell::new(ioref), marker::PhantomData)
218 }
219}
220
221impl<I: IoStream> From<I> for Io {
222 #[inline]
223 fn from(io: I) -> Io {
224 Io::new(io, SharedCfg::default())
225 }
226}
227
228impl IoRef {
229 fn create_empty() -> IoRef {
230 IoRef(Rc::new(IoState {
231 id: Cell::new(Id::default()),
232 cfg: SharedCfg::default().get::<IoConfig>(),
233 filter: FilterPtr::null(),
234 flags: Flags::new_stopped(),
235 error: Cell::new(None),
236 dispatch_task: LocalWaker::new(),
237 read_task: LocalWaker::new(),
238 write_task: LocalWaker::new(),
239 buffer: Stack::new(BytePageSize::Size16),
240 handle: Cell::new(None),
241 timeout: Cell::new(TimerHandle::default()),
242 shutdown_timeout: Cell::new(None),
243 on_disconnect: Cell::new(None),
244 }))
245 }
246}
247
248impl<F> Io<F> {
249 #[inline]
250 pub fn get_ref(&self) -> IoRef {
252 self.io_ref().clone()
253 }
254
255 #[inline]
256 #[must_use]
257 pub fn take(&self) -> Self {
261 Self(UnsafeCell::new(self.take_io_ref()), marker::PhantomData)
262 }
263
264 fn take_io_ref(&self) -> IoRef {
265 unsafe { mem::replace(&mut *self.0.get(), IoRef::create_empty()) }
266 }
267
268 fn st(&self) -> &IoState {
269 unsafe { &(*self.0.get()).0 }
270 }
271
272 fn io_ref(&self) -> &IoRef {
273 unsafe { &*self.0.get() }
274 }
275
276 #[inline]
277 pub fn set_config<T: Into<SharedCfg>>(&self, cfg: T) {
279 unsafe {
280 let cfg = cfg.into().get::<IoConfig>();
281 self.st().buffer.set_page_size(cfg.write_page_size());
282 self.st().cfg.replace(cfg);
283 }
284 }
285}
286
287impl<F: FilterLayer, T: Filter> Io<Layer<F, T>> {
288 #[inline]
289 pub fn filter(&self) -> &F {
291 &self.st().filter.filter::<Layer<F, T>>().0
292 }
293}
294
295impl<F: Filter> Io<F> {
296 #[inline]
297 pub fn seal(self) -> Io<Sealed> {
299 let state = self.take_io_ref();
300 state.0.filter.seal::<F>();
301
302 Io(UnsafeCell::new(state), marker::PhantomData)
303 }
304
305 #[inline]
306 pub fn boxed(self) -> IoBoxed {
308 self.seal().into()
309 }
310
311 #[inline]
312 pub fn add_filter<U>(self, nf: U) -> Io<Layer<U, F>>
314 where
315 U: FilterLayer,
316 {
317 if let Err(e) = self.st().buffer.process_write_buf(&self) {
320 self.st().terminate_connection(Some(e));
321 }
322
323 let state = self.take_io_ref();
324
325 unsafe { &mut *(Rc::as_ptr(&state.0).cast_mut()) }
330 .buffer
331 .add_layer(state.0.cfg.write_page_size());
332
333 state.0.filter.add_filter::<F, U>(nf);
335
336 let io = Io(UnsafeCell::new(state), marker::PhantomData);
337
338 if let Err(e) = io.st().buffer.process_read_buf(&io, 0) {
340 io.st().terminate_connection(Some(e));
341 }
342 io
343 }
344
345 pub fn map_filter<U, R>(self, f: U) -> Io<R>
347 where
348 U: FnOnce(F) -> R,
349 R: Filter,
350 {
351 if let Err(e) = self.st().buffer.process_write_buf(&self) {
354 self.st().terminate_connection(Some(e));
355 }
356
357 let state = self.take_io_ref();
358 state.0.filter.map_filter::<F, U, R>(f);
359
360 Io(UnsafeCell::new(state), marker::PhantomData)
361 }
362}
363
364impl<F> Io<F> {
365 pub async fn recv<U>(
367 &self,
368 codec: &U,
369 ) -> Result<Option<U::Item>, Either<U::Error, io::Error>>
370 where
371 U: Decoder,
372 {
373 loop {
374 return match poll_fn(|cx| self.poll_recv(codec, cx)).await {
375 Ok(item) => Ok(Some(item)),
376 Err(RecvError::KeepAlive) => Err(Either::Right(io::Error::new(
377 io::ErrorKind::TimedOut,
378 "Timeout",
379 ))),
380 Err(RecvError::WriteBackpressure) => {
381 poll_fn(|cx| self.poll_flush(cx, false))
382 .await
383 .map_err(Either::Right)?;
384 continue;
385 }
386 Err(RecvError::Decoder(err)) => Err(Either::Left(err)),
387 Err(RecvError::PeerGone(Some(err))) => Err(Either::Right(err)),
388 Err(RecvError::PeerGone(None)) => Ok(None),
389 };
390 }
391 }
392
393 pub async fn read(&self, dst: &mut [u8]) -> io::Result<()> {
397 loop {
398 let completed = self.with_read_buf(|buf| {
399 if buf.len() >= dst.len() {
400 let _ = io::Read::read(buf, dst).expect("Cannot fail");
401 true
402 } else {
403 false
404 }
405 });
406 if completed {
407 return Ok(());
408 }
409 self.read_ready().await?;
410 }
411 }
412
413 #[inline]
414 pub async fn read_ready(&self) -> io::Result<Option<()>> {
416 poll_fn(|cx| self.poll_read_ready(cx)).await
417 }
418
419 #[inline]
420 pub async fn read_notify(&self) -> io::Result<Option<()>> {
422 poll_fn(|cx| self.poll_read_notify(cx)).await
423 }
424
425 #[inline]
426 pub fn pause(&self) {
428 let st = self.st();
429 if !st.flags.is_read_paused() {
430 st.wake_read_task();
431 st.flags.set_read_paused();
432 }
433 }
434
435 #[inline]
436 pub async fn send<U>(
438 &self,
439 item: U::Item,
440 codec: &U,
441 ) -> Result<(), Either<U::Error, io::Error>>
442 where
443 U: Encoder,
444 {
445 self.encode(item, codec).map_err(Either::Left)?;
446
447 poll_fn(|cx| self.poll_flush(cx, true))
448 .await
449 .map_err(Either::Right)?;
450
451 Ok(())
452 }
453
454 #[inline]
455 pub async fn flush(&self, full: bool) -> io::Result<()> {
459 poll_fn(|cx| self.poll_flush(cx, full)).await
460 }
461
462 #[inline]
463 pub async fn shutdown(&self) -> io::Result<()> {
465 poll_fn(|cx| self.poll_shutdown(cx)).await
466 }
467
468 #[inline]
469 pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Option<()>>> {
484 let st = self.st();
485
486 if st.flags.is_closed() {
487 Poll::Ready(Ok(None))
488 } else {
489 let ready = st.flags.is_read_ready();
490
491 if st.flags.is_read_paused_or_backpressure() {
494 st.flags.unset_all_read_flags();
495 st.flags.unset_read_paused();
496 st.wake_read_task();
497 if ready {
498 Poll::Ready(Ok(Some(())))
499 } else {
500 st.dispatch_task.register(cx.waker());
501 Poll::Pending
502 }
503 } else if ready {
504 Poll::Ready(Ok(Some(())))
505 } else {
506 if st.flags.is_read_paused() {
507 st.wake_read_task();
508 st.flags.unset_read_paused();
509 }
510 st.dispatch_task.register(cx.waker());
511 Poll::Pending
512 }
513 }
514 }
515
516 #[inline]
517 pub fn poll_read_notify(&self, cx: &mut Context<'_>) -> Poll<io::Result<Option<()>>> {
519 let st = self.st();
520 if st.flags.is_stopping() {
521 Poll::Ready(Ok(None))
522 } else if st.flags.check_read_notifed() {
523 Poll::Ready(Ok(Some(())))
524 } else {
525 st.flags.set_read_notify();
526 if self.poll_read_ready(cx).is_ready() {
527 st.dispatch_task.register(cx.waker());
528 }
529 st.dispatch_task.register(cx.waker());
530 Poll::Pending
531 }
532 }
533
534 #[inline]
535 pub fn poll_recv<U>(
540 &self,
541 codec: &U,
542 cx: &mut Context<'_>,
543 ) -> Poll<Result<U::Item, RecvError<U>>>
544 where
545 U: Decoder,
546 {
547 let decoded = self.poll_recv_decode(codec, cx)?;
548
549 if let Some(item) = decoded.item {
550 Poll::Ready(Ok(item))
551 } else {
552 Poll::Pending
553 }
554 }
555
556 #[inline]
557 pub fn poll_recv_decode<U>(
562 &self,
563 codec: &U,
564 cx: &mut Context<'_>,
565 ) -> Result<Decoded<U::Item>, RecvError<U>>
566 where
567 U: Decoder,
568 {
569 let st = self.st();
570 st.flags.unset_read_ready();
571
572 let decoded = self
573 .decode_item(codec)
574 .map_err(|err| RecvError::Decoder(err))?;
575
576 if decoded.item.is_some() {
577 Ok(decoded)
578 } else if st.flags.is_stopping() {
579 Err(RecvError::PeerGone(st.error()))
580 } else if st.flags.check_dispatcher_timeout() {
581 Err(RecvError::KeepAlive)
582 } else if st.flags.is_wr_backpressure() {
583 Err(RecvError::WriteBackpressure)
584 } else {
585 match self.poll_read_ready(cx) {
586 Poll::Pending | Poll::Ready(Ok(Some(()))) => {
587 #[cfg(feature = "trace")]
588 if decoded.remains != 0 {
589 log::trace!("{}: Not enough data to decode next frame", self.tag());
590 }
591 Ok(decoded)
592 }
593 Poll::Ready(Err(e)) => Err(RecvError::PeerGone(Some(e))),
594 Poll::Ready(Ok(None)) => Err(RecvError::PeerGone(None)),
595 }
596 }
597 }
598
599 #[inline]
600 pub fn poll_flush(&self, cx: &mut Context<'_>, full: bool) -> Poll<io::Result<()>> {
605 let st = self.st();
606
607 st.buffer.process_write_buf_force(self)?;
609 self.consolidate_write_state(false);
610
611 let len = st.buffer.write_buf_size();
612 if len > 0 {
613 if st.flags.is_closed() {
614 return Poll::Ready(Err(st.error_or_disconnected()));
615 } else if full {
616 st.flags.set_wants_write_flush();
617 st.dispatch_task.register(cx.waker());
618 return Poll::Pending;
619 } else if st.is_wr_backpressure_needed(len) {
620 st.flags.set_wr_backpressure();
621 st.dispatch_task.register(cx.waker());
622 return Poll::Pending;
623 }
624 }
625 if st.flags.is_closed() && !st.flags.is_write_flush() {
626 Poll::Ready(Err(st.error_or_disconnected()))
627 } else {
628 st.flags.unset_wr_backpressure_and_flush();
629 Poll::Ready(Ok(()))
630 }
631 }
632
633 #[inline]
634 pub fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
636 let st = self.st();
637
638 if st.flags.is_stopping() {
639 if let Some(err) = st.error() {
640 Poll::Ready(Err(err))
641 } else {
642 Poll::Ready(Ok(()))
643 }
644 } else {
645 if !st.flags.is_stopping_filters() {
646 st.start_shutdown();
647 }
648 st.flags.unset_all_read_flags();
649 st.flags.unset_read_paused();
650
651 st.wake_read_task();
652 st.dispatch_task.register(cx.waker());
653 Poll::Pending
654 }
655 }
656
657 #[inline]
658 pub fn poll_read_pause(&self, cx: &mut Context<'_>) -> Poll<IoStatusUpdate> {
662 self.pause();
663 self.poll_status_update(cx)
664 }
665
666 #[inline]
667 pub fn poll_status_update(&self, cx: &mut Context<'_>) -> Poll<IoStatusUpdate> {
669 let st = self.st();
670 st.dispatch_task.register(cx.waker());
671 if st.flags.is_closed() {
672 Poll::Ready(IoStatusUpdate::PeerGone(st.error()))
673 } else if st.flags.check_dispatcher_timeout() {
674 Poll::Ready(IoStatusUpdate::KeepAlive)
675 } else if st.flags.is_wr_backpressure() {
676 if st.should_disable_wr_backpressure(st.buffer.write_buf_size()) {
678 st.flags.unset_wr_backpressure();
679 }
680 Poll::Ready(IoStatusUpdate::WriteBackpressure)
681 } else {
682 Poll::Pending
683 }
684 }
685
686 #[inline]
687 pub fn poll_dispatch(&self, cx: &mut Context<'_>) {
689 self.st().dispatch_task.register(cx.waker());
690 }
691}
692
693impl<F> AsRef<IoRef> for Io<F> {
694 #[inline]
695 fn as_ref(&self) -> &IoRef {
696 self.io_ref()
697 }
698}
699
700impl<F> Eq for Io<F> {}
701
702impl<F> PartialEq for Io<F> {
703 #[inline]
704 fn eq(&self, other: &Self) -> bool {
705 self.io_ref().eq(other.io_ref())
706 }
707}
708
709impl<F> hash::Hash for Io<F> {
710 #[inline]
711 fn hash<H: hash::Hasher>(&self, state: &mut H) {
712 self.io_ref().hash(state);
713 }
714}
715
716impl<F> fmt::Debug for Io<F> {
717 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
718 f.debug_struct("Io").field("state", self.st()).finish()
719 }
720}
721
722impl<F> ops::Deref for Io<F> {
723 type Target = IoRef;
724
725 #[inline]
726 fn deref(&self) -> &Self::Target {
727 self.io_ref()
728 }
729}
730
731impl<F> Drop for Io<F> {
732 fn drop(&mut self) {
733 let st = self.st();
734 self.stop_timer();
735
736 if st.filter.is_set() {
737 if !st.flags.is_terminated() {
740 log::trace!("{}: Io is dropped, terminate connection", st.tag());
741 }
742
743 st.terminate_connection(None);
744 st.filter.drop_filter::<F>();
745 }
746
747 IoManager::unregister(self.io_ref());
748 }
749}
750
751#[derive(Debug)]
752#[must_use = "OnDisconnect do nothing unless polled"]
754pub struct OnDisconnect {
755 token: usize,
756 inner: Rc<IoState>,
757}
758
759impl OnDisconnect {
760 pub(super) fn new(inner: Rc<IoState>) -> Self {
761 Self::new_inner(inner.flags.is_stopping(), inner)
762 }
763
764 fn new_inner(disconnected: bool, inner: Rc<IoState>) -> Self {
765 let token = if disconnected {
766 usize::MAX
767 } else {
768 let mut on_disconnect = inner.on_disconnect.take();
769 let token = if let Some(ref mut on_disconnect) = on_disconnect {
770 let token = on_disconnect.len();
771 on_disconnect.push(LocalWaker::default());
772 token
773 } else {
774 on_disconnect = Some(Box::new(vec![LocalWaker::default()]));
775 0
776 };
777 inner.on_disconnect.set(on_disconnect);
778 token
779 };
780 Self { token, inner }
781 }
782
783 #[inline]
784 pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<()> {
786 if self.token == usize::MAX || self.inner.flags.is_stopping() {
787 Poll::Ready(())
788 } else if let Some(on_disconnect) = self.inner.on_disconnect.take() {
789 on_disconnect[self.token].register(cx.waker());
790 self.inner.on_disconnect.set(Some(on_disconnect));
791 Poll::Pending
792 } else {
793 Poll::Ready(())
794 }
795 }
796}
797
798impl Clone for OnDisconnect {
799 fn clone(&self) -> Self {
800 if self.token == usize::MAX {
801 OnDisconnect::new_inner(true, self.inner.clone())
802 } else {
803 OnDisconnect::new_inner(false, self.inner.clone())
804 }
805 }
806}
807
808impl Future for OnDisconnect {
809 type Output = ();
810
811 #[inline]
812 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
813 self.poll_ready(cx)
814 }
815}
816
817#[cfg(test)]
818mod tests {
819 use ntex_bytes::{BufMut, BytePages, Bytes, BytesMut};
820 use ntex_codec::BytesCodec;
821 use ntex_util::{future::lazy, time::Millis, time::sleep};
822
823 use super::*;
824 use crate::{
825 FilterBuf, IoContext, IoTaskStatus, Readiness, ops::Iops, testing::IoTest,
826 };
827
828 const BIN: &[u8] = b"GET /test HTTP/1\r\n\r\n";
829 const TEXT: &str = "GET /test HTTP/1\r\n\r\n";
830 const BIN2: &[u8] = b"12345678901234561234567890123456";
831
832 #[ntex::test]
833 async fn test_basics() {
834 let (client, server) = IoTest::create();
835 client.remote_buffer_cap(1024);
836
837 let server = Io::from(server);
838 assert!(server.eq(&server));
839 assert!(server.io_ref().eq(server.io_ref()));
840 }
841
842 #[ntex::test]
843 async fn test_recv() {
844 let (client, server) = IoTest::create();
845 client.remote_buffer_cap(1024);
846
847 let server = Io::new(server, SharedCfg::new("SRV"));
848
849 server.st().notify_timeout();
850 let err = server.recv(&BytesCodec).await.err().unwrap();
851 assert!(format!("{err:?}").contains("Timeout"));
852
853 client.write(TEXT);
854 server.st().flags.set_wr_backpressure();
855 let item = server.recv(&BytesCodec).await.ok().unwrap().unwrap();
856 assert_eq!(item, TEXT);
857 }
858
859 #[ntex::test]
860 async fn test_send() {
861 let (client, server) = IoTest::create();
862 client.remote_buffer_cap(1024);
863
864 let server = Io::from(server);
865 assert!(server.eq(&server));
866
867 server
868 .send(Bytes::from_static(BIN), &BytesCodec)
869 .await
870 .ok()
871 .unwrap();
872 let item = client.read_any();
873 assert_eq!(item, TEXT);
874 }
875
876 #[ntex::test]
877 async fn read() {
878 let io = Io::new(
879 IoTest::create().0,
880 SharedCfg::new("SRV").add(IoConfig::default().set_read_buf(8, 4, 16)),
881 );
882 assert!(lazy(|cx| io.poll_read_ready(cx)).await.is_pending());
883 assert!(io.st().dispatch_task.is_set());
884
885 let ctx = IoContext::new(io.get_ref());
886
887 assert_eq!(
889 lazy(|cx| ctx.poll_read_ready(cx)).await,
890 Poll::Ready(Readiness::Ready)
891 );
892 assert!(io.st().read_task.is_set());
893 assert!(!io.st().flags.is_read_ready());
894 assert!(!io.st().flags.is_rd_backpressure());
895
896 ctx.update_read_status(BytesMut::copy_from_slice(b"1234567890"), Ok(10));
898
899 assert!(!io.st().dispatch_task.is_set());
901 assert!(io.st().flags.is_read_paused());
903 assert!(io.st().flags.is_read_ready());
905 assert!(io.st().flags.is_rd_backpressure());
907 assert_eq!(lazy(|cx| ctx.poll_read_ready(cx)).await, Poll::Pending);
909
910 assert_eq!(io.with_read_buf(|buf| buf.split_to(1)), b"1");
912 assert!(io.st().flags.is_read_ready());
914 assert!(io.st().flags.is_rd_backpressure());
916
917 assert!(io.st().read_task.is_set());
919
920 assert_eq!(io.with_read_buf(|buf| buf.split_to(1)), b"2");
922 assert!(io.st().flags.is_rd_backpressure());
924
925 assert_eq!(io.with_read_buf(|buf| buf.split_to(4)), b"3456");
927 assert!(!io.st().flags.is_read_paused());
929 assert!(!io.st().flags.is_read_ready());
931 assert!(!io.st().flags.is_rd_backpressure());
933 assert!(!io.st().read_task.is_set());
935 assert_eq!(
936 lazy(|cx| ctx.poll_read_ready(cx)).await,
937 Poll::Ready(Readiness::Ready)
938 );
939
940 lazy(|cx| io.poll_dispatch(cx)).await;
942
943 ctx.update_read_status(BytesMut::copy_from_slice(b"1234"), Ok(4));
945
946 assert!(!io.st().dispatch_task.is_set());
948 assert!(io.st().flags.is_read_paused());
950 assert!(io.st().flags.is_read_ready());
952 assert!(io.st().flags.is_rd_backpressure());
954 assert_eq!(lazy(|cx| ctx.poll_read_ready(cx)).await, Poll::Pending);
956
957 assert_eq!(io.with_read_buf(|buf| buf.split_to(4)), b"7890");
959 assert!(!io.st().flags.is_rd_backpressure());
961
962 lazy(|cx| io.poll_dispatch(cx)).await;
964
965 ctx.update_read_status(BytesMut::copy_from_slice(b"567"), Ok(3));
967
968 assert!(!io.st().flags.is_read_paused());
970 assert!(io.st().flags.is_read_ready());
972 assert!(!io.st().flags.is_rd_backpressure());
974 assert_eq!(
976 lazy(|cx| ctx.poll_read_ready(cx)).await,
977 Poll::Ready(Readiness::Ready)
978 );
979
980 assert_eq!(io.with_read_buf(BytesMut::take), b"1234567");
982 assert!(!io.st().flags.is_read_paused());
984 assert!(!io.st().flags.is_read_ready());
986 assert!(io.st().read_task.is_set());
988
989 io.terminate();
991 assert!(!io.st().read_task.is_set());
993 assert_eq!(
995 lazy(|cx| ctx.poll_read_ready(cx)).await,
996 Poll::Ready(Readiness::Terminate)
997 );
998 }
999
1000 #[ntex::test]
1001 async fn read_notify() {
1002 let io = Io::new(
1003 IoTest::create().0,
1004 SharedCfg::new("SRV").add(IoConfig::default().set_read_buf(8, 4, 16)),
1005 );
1006 assert!(!io.st().flags.is_read_notify());
1007 assert!(lazy(|cx| io.poll_read_notify(cx)).await.is_pending());
1008 assert!(io.st().dispatch_task.is_set());
1009 assert!(io.st().flags.is_read_notify());
1010
1011 let ctx = IoContext::new(io.get_ref());
1012
1013 ctx.update_read_status(BytesMut::copy_from_slice(b"1"), Ok(1));
1015
1016 assert!(!io.st().dispatch_task.is_set());
1017 assert!(io.st().flags.is_read_ready());
1019 assert!(io.st().flags.is_read_notify());
1020 assert!(io.st().flags.is_read_notified());
1022 let res = lazy(|cx| io.poll_read_notify(cx)).await;
1023 assert!(matches!(res, Poll::Ready(Ok(Some(())))));
1024
1025 assert!(!io.st().dispatch_task.is_set());
1027 assert!(io.st().flags.is_read_ready());
1029
1030 assert!(lazy(|cx| io.poll_read_notify(cx)).await.is_pending());
1032 assert!(io.st().dispatch_task.is_set());
1033 assert!(io.st().flags.is_read_notify());
1034 assert!(io.st().flags.is_read_ready());
1035 assert_eq!(
1037 lazy(|cx| ctx.poll_read_ready(cx)).await,
1038 Poll::Ready(Readiness::Ready)
1039 );
1040
1041 ctx.update_read_status(BytesMut::copy_from_slice(b"2345678"), Ok(7));
1043 assert!(io.st().flags.is_rd_backpressure());
1045
1046 assert!(io.st().flags.is_read_ready());
1048 assert!(io.st().flags.is_read_notify());
1049 assert!(io.st().flags.is_read_notified());
1051 let res = lazy(|cx| io.poll_read_notify(cx)).await;
1052 assert!(matches!(res, Poll::Ready(Ok(Some(())))));
1053 assert_eq!(lazy(|cx| ctx.poll_read_ready(cx)).await, Poll::Pending);
1055 assert!(io.st().read_task.is_set());
1057
1058 assert!(lazy(|cx| io.poll_read_notify(cx)).await.is_pending());
1060 assert!(!io.st().flags.is_rd_backpressure());
1062 assert!(!io.st().flags.is_read_ready());
1063 assert!(!io.st().flags.is_read_paused());
1064 assert!(!io.st().read_task.is_set());
1066 assert_eq!(
1068 lazy(|cx| ctx.poll_read_ready(cx)).await,
1069 Poll::Ready(Readiness::Ready)
1070 );
1071
1072 ctx.update_read_status(BytesMut::copy_from_slice(b"1"), Ok(1));
1074 assert!(!io.st().dispatch_task.is_set());
1075 assert!(io.st().flags.is_read_ready());
1077 assert!(io.st().flags.is_read_notify());
1078 assert!(io.st().flags.is_read_paused());
1079 assert!(io.st().flags.is_rd_backpressure());
1080 assert!(io.st().flags.is_read_notified());
1082 assert!(matches!(
1083 lazy(|cx| io.poll_read_notify(cx)).await,
1084 Poll::Ready(Ok(Some(())))
1085 ));
1086
1087 io.terminate();
1089 let res = lazy(|cx| io.poll_read_notify(cx)).await;
1090 assert!(matches!(res, Poll::Ready(Ok(None))), "{res:?}");
1091 }
1092
1093 #[ntex::test]
1094 async fn read_readiness() {
1095 let (client, server) = IoTest::create();
1096 client.remote_buffer_cap(1024);
1097
1098 let io = Io::from(server);
1099 assert!(lazy(|cx| io.poll_read_ready(cx)).await.is_pending());
1100
1101 client.write(TEXT);
1102 assert_eq!(io.read_ready().await.unwrap(), Some(()));
1103 assert!(matches!(
1104 lazy(|cx| io.poll_read_ready(cx)).await,
1105 Poll::Ready(Ok(Some(())))
1106 ));
1107
1108 let item = io.with_read_buf(BytesMut::take);
1109 assert_eq!(item, Bytes::from_static(BIN));
1110
1111 client.write(TEXT);
1112 sleep(Millis(50)).await;
1113 assert!(lazy(|cx| io.poll_read_ready(cx)).await.is_ready());
1114 assert!(lazy(|cx| io.poll_read_ready(cx)).await.is_ready());
1115 }
1116
1117 #[ntex::test]
1118 async fn read_backpressure() {
1119 let (client, server) = IoTest::create();
1120
1121 let io = Io::new(
1122 server,
1123 SharedCfg::new("SRV").add(IoConfig::default().set_read_buf(64, 32, 12)),
1124 );
1125 assert!(lazy(|cx| io.poll_read_ready(cx)).await.is_pending());
1126
1127 client.write(BIN2);
1128 client.write(BIN2);
1129 sleep(Millis(50)).await;
1130 assert!(io.flags().is_read_ready());
1131 assert!(io.flags().is_rd_backpressure());
1132 let _item = io.recv(&BytesCodec).await.ok().unwrap().unwrap();
1133 assert!(!io.flags().is_read_ready());
1134 assert!(!io.flags().is_rd_backpressure());
1135
1136 client.write(BIN2);
1137 client.write(BIN2);
1138 sleep(Millis(50)).await;
1139 assert!(io.flags().is_read_ready());
1140 assert!(io.flags().is_rd_backpressure());
1141 assert_eq!(io.read_ready().await.unwrap(), Some(()));
1142 }
1143
1144 #[ntex::test]
1145 async fn write() {
1146 let io = Io::new(
1147 IoTest::create().0,
1148 SharedCfg::new("SRV").add(IoConfig::default().set_write_buf(8, 4, 16)),
1149 );
1150 assert!(lazy(|cx| io.poll_status_update(cx)).await.is_pending());
1151 assert!(io.st().dispatch_task.is_set());
1152 assert!(io.st().flags.is_direct_wr_enabled());
1153
1154 let ctx = IoContext::new(io.get_ref());
1155
1156 assert_eq!(lazy(|cx| ctx.poll_write_ready(cx)).await, Poll::Pending);
1158 assert!(io.st().write_task.is_set());
1159 assert!(io.st().flags.is_write_paused());
1160 assert!(!io.st().flags.is_wr_backpressure());
1161
1162 io.with_write_buf(|buf| buf.put_slice(b"1234")).unwrap();
1164 assert_eq!(lazy(|cx| ctx.poll_write_ready(cx)).await, Poll::Pending);
1165 assert!(io.st().flags.is_write_paused());
1167 assert!(io.st().flags.is_wr_send_scheduled());
1169 assert!(!io.st().flags.is_wr_backpressure());
1171 assert!(io.st().dispatch_task.is_set());
1173
1174 io.with_write_buf(|buf| buf.put_slice(b"5678")).unwrap();
1176 assert!(io.st().flags.is_wr_backpressure());
1178 assert!(!io.st().dispatch_task.is_set());
1180 assert!(io.st().write_task.is_set());
1182 assert!(matches!(
1184 lazy(|cx| io.poll_status_update(cx)).await,
1185 Poll::Ready(IoStatusUpdate::WriteBackpressure)
1186 ));
1187 assert!(lazy(|cx| io.poll_flush(cx, false)).await.is_pending());
1189 assert!(!io.st().flags.is_write_flush());
1191
1192 Iops::run();
1194 assert!(!io.st().flags.is_wr_send_scheduled());
1196 assert!(!io.st().flags.is_write_paused());
1198 assert!(!io.st().write_task.is_set());
1200 assert_eq!(
1202 lazy(|cx| ctx.poll_write_ready(cx)).await,
1203 Poll::Ready(Readiness::Ready)
1204 );
1205
1206 assert_eq!(ctx.with_write_buf(|buf| buf.split_to(4).freeze()), b"1234");
1208 assert_eq!(ctx.update_write_status(Ok(true)), IoTaskStatus::Io);
1210 assert_eq!(
1212 lazy(|cx| ctx.poll_write_ready(cx)).await,
1213 Poll::Ready(Readiness::Ready)
1214 );
1215 assert!(!io.st().flags.is_write_paused());
1217 assert!(io.st().flags.is_wr_backpressure());
1219 assert!(matches!(
1221 lazy(|cx| io.poll_status_update(cx)).await,
1222 Poll::Ready(IoStatusUpdate::WriteBackpressure)
1223 ));
1224 assert!(!io.st().flags.is_wr_backpressure());
1226 assert!(lazy(|cx| io.poll_status_update(cx)).await.is_pending());
1227 assert!(matches!(
1229 lazy(|cx| io.poll_flush(cx, false)).await,
1230 Poll::Ready(Ok(()))
1231 ));
1232
1233 io.with_write_buf(|buf| buf.put_slice(b"1234")).unwrap();
1235 assert!(lazy(|cx| io.poll_flush(cx, true)).await.is_pending());
1236 assert!(io.st().flags.is_write_flush());
1238 assert!(io.st().flags.is_wr_backpressure());
1240
1241 Iops::run();
1243 assert_eq!(ctx.with_write_buf(BytePages::freeze), b"56781234");
1244 assert!(!io.st().flags.is_wr_send_scheduled());
1246 assert_eq!(ctx.update_write_status(Ok(true)), IoTaskStatus::Pause);
1248 assert!(io.st().flags.is_write_paused());
1250 assert!(io.st().flags.is_write_flush());
1252 assert!(io.st().flags.is_wr_backpressure());
1254 assert!(!io.st().dispatch_task.is_set());
1256
1257 assert!(matches!(
1259 lazy(|cx| io.poll_flush(cx, false)).await,
1260 Poll::Ready(Ok(()))
1261 ));
1262 assert!(!io.st().flags.is_write_flush());
1264 assert!(!io.st().flags.is_wr_backpressure());
1266
1267 io.terminate();
1269 assert!(!io.st().write_task.is_set());
1271 assert_eq!(
1273 lazy(|cx| ctx.poll_write_ready(cx)).await,
1274 Poll::Ready(Readiness::Terminate)
1275 );
1276 let Poll::Ready(Err(err)) = lazy(|cx| io.poll_flush(cx, false)).await else {
1278 panic!()
1279 };
1280 assert_eq!(err.kind(), io::ErrorKind::NotConnected);
1281 assert!(matches!(
1283 lazy(|cx| io.poll_status_update(cx)).await,
1284 Poll::Ready(IoStatusUpdate::PeerGone(None))
1285 ));
1286 }
1287
1288 #[ntex::test]
1289 async fn write_backpressure() {
1290 let (client, server) = IoTest::create();
1291 client.remote_buffer_cap(0);
1292
1293 let io = Io::new(
1294 server,
1295 SharedCfg::new("SRV").add(IoConfig::default().set_write_buf(16, 8, 12)),
1296 );
1297 assert!(lazy(|cx| io.poll_read_ready(cx)).await.is_pending());
1298 assert!(io.flags().is_write_paused());
1299 assert!(!io.flags().is_wr_backpressure());
1300 assert!(!io.is_wr_backpressure());
1301
1302 io.encode_slice(BIN2).unwrap();
1303 assert!(Iops::is_registered(&io));
1304 assert!(io.flags().is_wr_backpressure());
1305
1306 client.remote_buffer_cap(1024);
1307 let item = client.read().await.unwrap();
1308 assert_eq!(item, BIN2);
1309 assert!(io.flags().is_wr_backpressure());
1310 assert!(matches!(
1311 lazy(|cx| io.poll_status_update(cx)).await,
1312 Poll::Ready(IoStatusUpdate::WriteBackpressure)
1313 ));
1314 assert!(!io.flags().is_wr_backpressure());
1315 assert!(matches!(
1316 lazy(|cx| io.poll_flush(cx, false)).await,
1317 Poll::Ready(Ok(()))
1318 ));
1319 assert!(!io.flags().is_wr_backpressure());
1320 }
1321
1322 #[ntex::test]
1323 async fn shutdown() {
1324 #[derive(Debug)]
1326 struct F;
1327
1328 impl FilterLayer for F {
1329 fn process_read_buf(&self, _: &FilterBuf<'_>) -> io::Result<()> {
1330 Ok(())
1331 }
1332 fn process_write_buf(&self, _: &FilterBuf<'_>) -> io::Result<()> {
1333 Ok(())
1334 }
1335 }
1336
1337 let io = Io::new(
1338 IoTest::create().0,
1339 SharedCfg::new("SRV").add(IoConfig::default().set_write_buf(8, 4, 16)),
1340 );
1341 let st = io.st();
1342 assert!(lazy(|cx| io.poll_status_update(cx)).await.is_pending());
1343 assert!(st.dispatch_task.is_set());
1344 assert!(!st.flags.is_closed());
1345 assert!(!st.flags.is_stopping_filters());
1346
1347 let ctx = IoContext::new(io.get_ref());
1348
1349 io.close();
1351 assert!(!st.flags.is_closed());
1352 assert!(st.flags.is_stopping_filters());
1353 let err = io.with_write_buf(|_| 1).unwrap_err();
1355 assert_eq!(err.kind(), io::ErrorKind::Other);
1356
1357 let io = io.add_filter(F);
1358 let layer = Layer::new(F, Base::new(io.get_ref()));
1359
1360 let st = io.st();
1361 st.buffer.with_write_src(|p| p.put_slice(b"123"));
1362 assert_eq!(st.buffer.write_buf_size(), 3);
1363 let res = st.buffer.with_filter(io.as_ref(), |f| layer.shutdown(f));
1364 assert!(matches!(res, Ok(Poll::Ready(()))));
1365 assert_eq!(st.buffer.write_buf_size(), 0);
1366
1367 ctx.stop(None);
1369 assert!(st.flags.is_closed());
1370 assert!(st.flags.is_terminated());
1371 assert!(st.flags.is_stopping_filters());
1372
1373 let err = io.with_write_buf(|_| 1).unwrap_err();
1374 assert_eq!(err.kind(), io::ErrorKind::NotConnected);
1375 }
1376}