s2n_quic_platform/socket/task/
rx.rs1use crate::{
5 message::Message,
6 socket::{ring::Producer, stats, task::events},
7};
8use core::{
9 future::Future,
10 pin::Pin,
11 task::{Context, Poll},
12};
13use s2n_quic_core::task::cooldown::Cooldown;
14
15pub use events::RxEvents as Events;
16
17pub trait Socket<T: Message> {
18 type Error;
19
20 fn recv(
21 &mut self,
22 cx: &mut Context,
23 entries: &mut [T],
24 events: &mut Events,
25 stats: &stats::Sender,
26 ) -> Result<(), Self::Error>;
27}
28
29pub struct Receiver<T: Message, S: Socket<T>> {
30 ring: Producer<T>,
31 rx: S,
33 ring_cooldown: Cooldown,
34 io_cooldown: Cooldown,
35 stats: stats::Sender,
36 has_registered_drop_waker: bool,
37}
38
39impl<T, S> Receiver<T, S>
40where
41 T: Message + Unpin,
42 S: Socket<T> + Unpin,
43{
44 #[inline]
45 pub fn new(ring: Producer<T>, rx: S, cooldown: Cooldown, stats: stats::Sender) -> Self {
46 Self {
47 ring,
48 rx,
49 ring_cooldown: cooldown.clone(),
50 io_cooldown: cooldown,
51 stats,
52 has_registered_drop_waker: false,
53 }
54 }
55
56 #[inline]
57 fn poll_ring(&mut self, watermark: u32, cx: &mut Context) -> Poll<Result<(), ()>> {
58 loop {
59 let is_loop = self.ring_cooldown.state().is_loop();
60
61 let count = if is_loop {
62 self.ring.acquire(watermark)
63 } else {
64 match self.ring.poll_acquire(watermark, cx) {
65 Poll::Ready(count) => count,
66 Poll::Pending if !self.ring.is_open() => return Err(()).into(),
67 Poll::Pending => 0,
68 }
69 };
70
71 if count > 0 {
73 self.ring_cooldown.on_ready();
74 return Ok(()).into();
75 }
76
77 if is_loop && self.ring_cooldown.on_pending_task(cx).is_sleep() {
78 continue;
79 }
80
81 return Poll::Pending;
82 }
83 }
84}
85
86impl<T, S> Future for Receiver<T, S>
87where
88 T: Message + Unpin,
89 S: Socket<T> + Unpin,
90{
91 type Output = Option<S::Error>;
92
93 #[inline]
94 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
95 let this = self.get_mut();
96
97 if !this.has_registered_drop_waker {
98 this.has_registered_drop_waker = true;
99 this.ring.register_drop_waker(cx);
100 }
101
102 let mut events = Events::default();
103
104 let mut pending_wake = false;
105
106 while !events.take_blocked() {
107 match this.poll_ring(u32::MAX, cx) {
108 Poll::Ready(Ok(_)) => {}
109 Poll::Ready(Err(_)) => return None.into(),
110 Poll::Pending => {
111 if pending_wake {
112 this.ring.wake();
113 }
114 return Poll::Pending;
115 }
116 }
117
118 let entries = this.ring.data();
119
120 match this.rx.recv(cx, entries, &mut events, &this.stats) {
122 Ok(_) => {
123 let count = events.take_count() as u32;
125
126 if count > 0 {
127 this.ring.release_no_wake(count);
128 this.io_cooldown.on_ready();
129 pending_wake = true;
130 }
131 }
132 Err(err) => return Some(err).into(),
133 }
134 }
135
136 this.io_cooldown.on_pending_task(cx);
137
138 if pending_wake {
139 this.ring.wake();
140 }
141
142 if !this.ring.is_open() {
143 return Poll::Ready(None);
144 }
145 Poll::Pending
146 }
147}