s2n_quic_platform/socket/task/
rx.rs

1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::{
5    message::Message,
6    socket::{ring::Producer, stats, task::events},
7};
8use core::{
9    future::Future,
10    pin::Pin,
11    task::{Context, Poll},
12};
13use s2n_quic_core::task::cooldown::Cooldown;
14
15pub use events::RxEvents as Events;
16
17pub trait Socket<T: Message> {
18    type Error;
19
20    fn recv(
21        &mut self,
22        cx: &mut Context,
23        entries: &mut [T],
24        events: &mut Events,
25        stats: &stats::Sender,
26    ) -> Result<(), Self::Error>;
27}
28
29pub struct Receiver<T: Message, S: Socket<T>> {
30    ring: Producer<T>,
31    /// Implementation of a socket that fills free slots in the ring buffer
32    rx: S,
33    ring_cooldown: Cooldown,
34    io_cooldown: Cooldown,
35    stats: stats::Sender,
36    has_registered_drop_waker: bool,
37}
38
39impl<T, S> Receiver<T, S>
40where
41    T: Message + Unpin,
42    S: Socket<T> + Unpin,
43{
44    #[inline]
45    pub fn new(ring: Producer<T>, rx: S, cooldown: Cooldown, stats: stats::Sender) -> Self {
46        Self {
47            ring,
48            rx,
49            ring_cooldown: cooldown.clone(),
50            io_cooldown: cooldown,
51            stats,
52            has_registered_drop_waker: false,
53        }
54    }
55
56    #[inline]
57    fn poll_ring(&mut self, watermark: u32, cx: &mut Context) -> Poll<Result<(), ()>> {
58        loop {
59            let is_loop = self.ring_cooldown.state().is_loop();
60
61            let count = if is_loop {
62                self.ring.acquire(watermark)
63            } else {
64                match self.ring.poll_acquire(watermark, cx) {
65                    Poll::Ready(count) => count,
66                    Poll::Pending if !self.ring.is_open() => return Err(()).into(),
67                    Poll::Pending => 0,
68                }
69            };
70
71            // if the number of free slots increased since last time then yield
72            if count > 0 {
73                self.ring_cooldown.on_ready();
74                return Ok(()).into();
75            }
76
77            if is_loop && self.ring_cooldown.on_pending_task(cx).is_sleep() {
78                continue;
79            }
80
81            return Poll::Pending;
82        }
83    }
84}
85
86impl<T, S> Future for Receiver<T, S>
87where
88    T: Message + Unpin,
89    S: Socket<T> + Unpin,
90{
91    type Output = Option<S::Error>;
92
93    #[inline]
94    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
95        let this = self.get_mut();
96
97        if !this.has_registered_drop_waker {
98            this.has_registered_drop_waker = true;
99            this.ring.register_drop_waker(cx);
100        }
101
102        let mut events = Events::default();
103
104        let mut pending_wake = false;
105
106        while !events.take_blocked() {
107            match this.poll_ring(u32::MAX, cx) {
108                Poll::Ready(Ok(_)) => {}
109                Poll::Ready(Err(_)) => return None.into(),
110                Poll::Pending => {
111                    if pending_wake {
112                        this.ring.wake();
113                    }
114                    return Poll::Pending;
115                }
116            }
117
118            let entries = this.ring.data();
119
120            // perform the recv syscall
121            match this.rx.recv(cx, entries, &mut events, &this.stats) {
122                Ok(_) => {
123                    // increment the number of received messages
124                    let count = events.take_count() as u32;
125
126                    if count > 0 {
127                        this.ring.release_no_wake(count);
128                        this.io_cooldown.on_ready();
129                        pending_wake = true;
130                    }
131                }
132                Err(err) => return Some(err).into(),
133            }
134        }
135
136        this.io_cooldown.on_pending_task(cx);
137
138        if pending_wake {
139            this.ring.wake();
140        }
141
142        if !this.ring.is_open() {
143            return Poll::Ready(None);
144        }
145        Poll::Pending
146    }
147}