s2n_quic_core/io/event_loop/
select.rs

1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::endpoint::CloseError;
5use core::{
6    future::Future,
7    pin::Pin,
8    task::{Context, Poll},
9};
10use pin_project_lite::pin_project;
11
12pin_project!(
13    /// The main event loop future for selecting readiness of sub-tasks
14    ///
15    /// This future ensures all sub-tasks are polled fairly by yielding once
16    /// after completing any of the sub-tasks. This is especially important when the TX queue is
17    /// flushed quickly and we never get notified of the RX socket having packets to read.
18    pub struct Select<Rx, Tx, Wakeup, Sleep>
19    where
20        Rx: Future,
21        Tx: Future,
22        Wakeup: Future,
23        Sleep: Future,
24    {
25        #[pin]
26        rx: Rx,
27        #[pin]
28        tx: Tx,
29        #[pin]
30        wakeup: Wakeup,
31        #[pin]
32        sleep: Sleep,
33    }
34);
35
36impl<Rx, Tx, Wakeup, Sleep> Select<Rx, Tx, Wakeup, Sleep>
37where
38    Rx: Future,
39    Tx: Future,
40    Wakeup: Future,
41    Sleep: Future,
42{
43    #[inline(always)]
44    pub fn new(rx: Rx, tx: Tx, wakeup: Wakeup, sleep: Sleep) -> Self {
45        Self {
46            rx,
47            tx,
48            wakeup,
49            sleep,
50        }
51    }
52}
53
54#[derive(Debug)]
55pub struct Outcome<Rx, Tx> {
56    pub rx_result: Option<Rx>,
57    pub tx_result: Option<Tx>,
58    pub timeout_expired: bool,
59    pub application_wakeup: bool,
60}
61
62pub type Result<Rx, Tx> = core::result::Result<Outcome<Rx, Tx>, CloseError>;
63
64impl<Rx, Tx, Wakeup, Sleep> Future for Select<Rx, Tx, Wakeup, Sleep>
65where
66    Rx: Future,
67    Tx: Future,
68    Wakeup: Future<Output = core::result::Result<usize, CloseError>>,
69    Sleep: Future,
70{
71    type Output = Result<Rx::Output, Tx::Output>;
72
73    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
74        let this = self.project();
75
76        let mut should_wake = false;
77        let mut application_wakeup = false;
78
79        if let Poll::Ready(wakeup) = this.wakeup.poll(cx) {
80            should_wake = true;
81            application_wakeup = true;
82            if let Err(err) = wakeup {
83                return Poll::Ready(Err(err));
84            }
85        }
86
87        let mut rx_result = None;
88        if let Poll::Ready(v) = this.rx.poll(cx) {
89            should_wake = true;
90            rx_result = Some(v);
91        }
92
93        let mut tx_result = None;
94        if let Poll::Ready(v) = this.tx.poll(cx) {
95            should_wake = true;
96            tx_result = Some(v);
97        }
98
99        let mut timeout_expired = false;
100
101        if this.sleep.poll(cx).is_ready() {
102            timeout_expired = true;
103            should_wake = true;
104        }
105
106        // if none of the subtasks are ready, return
107        if !should_wake {
108            return Poll::Pending;
109        }
110
111        Poll::Ready(Ok(Outcome {
112            rx_result,
113            tx_result,
114            timeout_expired,
115            application_wakeup,
116        }))
117    }
118}