s2n_quic_core/io/
tx.rs

1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::{event, inet::ExplicitCongestionNotification, path};
5use core::{
6    task::{Context, Poll},
7    time::Duration,
8};
9
10pub mod handle_map;
11pub mod router;
12
13pub trait Tx: Sized {
14    type PathHandle;
15    // TODO make this generic over lifetime
16    // See https://github.com/aws/s2n-quic/issues/1742
17    type Queue: Queue<Handle = Self::PathHandle>;
18    type Error;
19
20    /// Returns a future that yields after a packet is ready to be transmitted
21    #[inline]
22    fn ready(&mut self) -> TxReady<'_, Self> {
23        TxReady(self)
24    }
25
26    /// Polls the IO provider for capacity to send a packet
27    fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Self::Error>>;
28
29    /// Calls the provided callback with the IO provider queue
30    fn queue<F: FnOnce(&mut Self::Queue)>(&mut self, f: F);
31
32    /// Handles the queue error and potentially publishes an event
33    fn handle_error<E: event::EndpointPublisher>(self, error: Self::Error, event: &mut E);
34}
35
36impl_ready_future!(Tx, TxReady, Result<(), T::Error>);
37
38/// Extension traits for Tx channels
39pub trait TxExt: Tx {
40    /// Routes messages into one channel or another
41    #[inline]
42    fn with_router<Router, Other>(
43        self,
44        router: Router,
45        other: Other,
46    ) -> router::Channel<Router, Self, Other>
47    where
48        Router: router::Router,
49        Other: Tx,
50    {
51        router::Channel {
52            router,
53            a: self,
54            b: other,
55        }
56    }
57
58    /// Maps one type of handle to another with a mapping function
59    #[inline]
60    fn with_handle_map<Map, Handle>(self, map: Map) -> handle_map::Channel<Map, Self, Handle>
61    where
62        Map: Fn(&Handle) -> Self::PathHandle,
63    {
64        handle_map::Channel {
65            map,
66            tx: self,
67            handle: Default::default(),
68        }
69    }
70}
71
72/// Implement the extension traits for all Tx queues
73impl<T: Tx> TxExt for T {}
74
75/// A structure capable of queueing and transmitting messages
76pub trait Queue {
77    type Handle: path::Handle;
78
79    /// Set to true if the queue supports setting ECN markings
80    const SUPPORTS_ECN: bool = false;
81
82    /// Set to true if the queue supports pacing of sending messages
83    const SUPPORTS_PACING: bool = false;
84
85    /// Set to true if the queue supports setting IPv6 flow labels
86    const SUPPORTS_FLOW_LABELS: bool = false;
87
88    /// Pushes a message into the transmission queue
89    ///
90    /// The index of the message is returned to enable further operations to be
91    /// performed, e.g. encryption.
92    fn push<M: Message<Handle = Self::Handle>>(&mut self, message: M) -> Result<Outcome, Error>;
93
94    /// Flushes any pending messages from the TX queue.
95    ///
96    /// This should be called between multiple connections to ensure GSO segments aren't shared.
97    #[inline]
98    fn flush(&mut self) {
99        // default as no-op
100    }
101
102    /// Returns the number of remaining datagrams that can be transmitted
103    fn capacity(&self) -> usize;
104
105    /// Returns `true` if the queue will accept additional transmissions
106    #[inline]
107    fn has_capacity(&self) -> bool {
108        self.capacity() != 0
109    }
110}
111
112pub struct Outcome {
113    pub len: usize,
114    pub index: usize,
115}
116
117#[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Eq)]
118pub enum Error {
119    /// The provided message did not write a payload
120    EmptyPayload,
121
122    /// The provided buffer was too small for the desired payload
123    UndersizedBuffer,
124
125    /// The transmission queue is at capacity
126    AtCapacity,
127}
128
129/// Abstraction over a message to be sent on a socket
130///
131/// Instead of a concrete struct with eagerly evaluated fields,
132/// using trait callbacks ensure messages only need to compute what
133/// the actual transmission queue requires. For example, if the transmission
134/// queue cannot set ECN markings, it will not call the [`Message::ecn`] function.
135pub trait Message {
136    type Handle: path::Handle;
137
138    /// Returns the path handle on which this message should be sent
139    fn path_handle(&self) -> &Self::Handle;
140
141    /// Returns the ECN markings for the message
142    fn ecn(&mut self) -> ExplicitCongestionNotification;
143
144    /// Returns the Duration for which the message will be delayed.
145    ///
146    /// This is used in scenarios where packets need to be paced.
147    fn delay(&mut self) -> Duration;
148
149    /// Returns the IPv6 flow label for the message
150    fn ipv6_flow_label(&mut self) -> u32;
151
152    /// Returns true if the packet can be used in a GSO packet
153    fn can_gso(&self, segment_len: usize, segment_count: usize) -> bool;
154
155    /// Writes the payload of the message to an output buffer
156    fn write_payload(&mut self, buffer: PayloadBuffer, gso_offset: usize) -> Result<usize, Error>;
157}
158
159#[derive(Debug)]
160pub struct PayloadBuffer<'a>(&'a mut [u8]);
161
162impl<'a> PayloadBuffer<'a> {
163    #[inline]
164    pub fn new(bytes: &'a mut [u8]) -> Self {
165        Self(bytes)
166    }
167
168    /// # Safety
169    ///
170    /// This function should only be used in the case that the writer has its own safety checks in place
171    #[inline]
172    pub unsafe fn into_mut_slice(self) -> &'a mut [u8] {
173        self.0
174    }
175
176    #[track_caller]
177    #[inline]
178    pub fn write(&mut self, bytes: &[u8]) -> Result<usize, Error> {
179        if bytes.is_empty() {
180            return Err(Error::EmptyPayload);
181        }
182
183        if let Some(buffer) = self.0.get_mut(0..bytes.len()) {
184            buffer.copy_from_slice(bytes);
185            Ok(bytes.len())
186        } else {
187            debug_assert!(
188                false,
189                "tried to write more bytes than was available in the buffer"
190            );
191            Err(Error::UndersizedBuffer)
192        }
193    }
194}
195
196impl<Handle: path::Handle, Payload: AsRef<[u8]>> Message for (Handle, Payload) {
197    type Handle = Handle;
198
199    fn path_handle(&self) -> &Self::Handle {
200        &self.0
201    }
202
203    fn ecn(&mut self) -> ExplicitCongestionNotification {
204        Default::default()
205    }
206
207    fn delay(&mut self) -> Duration {
208        Default::default()
209    }
210
211    fn ipv6_flow_label(&mut self) -> u32 {
212        0
213    }
214
215    fn can_gso(&self, segment_len: usize, _segment_count: usize) -> bool {
216        segment_len >= self.1.as_ref().len()
217    }
218
219    fn write_payload(
220        &mut self,
221        mut buffer: PayloadBuffer,
222        _gso_offset: usize,
223    ) -> Result<usize, Error> {
224        buffer.write(self.1.as_ref())
225    }
226}
227
228impl<Handle: path::Handle, Payload: AsRef<[u8]>> Message
229    for (Handle, ExplicitCongestionNotification, Payload)
230{
231    type Handle = Handle;
232
233    fn path_handle(&self) -> &Self::Handle {
234        &self.0
235    }
236
237    fn ecn(&mut self) -> ExplicitCongestionNotification {
238        self.1
239    }
240
241    fn delay(&mut self) -> Duration {
242        Default::default()
243    }
244
245    fn ipv6_flow_label(&mut self) -> u32 {
246        0
247    }
248
249    fn can_gso(&self, segment_len: usize, _segment_count: usize) -> bool {
250        segment_len >= self.2.as_ref().len()
251    }
252
253    fn write_payload(
254        &mut self,
255        mut buffer: PayloadBuffer,
256        _gso_offset: usize,
257    ) -> Result<usize, Error> {
258        buffer.write(self.2.as_ref())
259    }
260}
261
262#[cfg(test)]
263mod tests {
264    use super::*;
265    use crate::inet::SocketAddressV4;
266
267    #[test]
268    fn message_tuple_test() {
269        let remote_address = SocketAddressV4::new([127, 0, 0, 1], 80).into();
270        let local_address = SocketAddressV4::new([192, 168, 0, 1], 3000).into();
271        let tuple = path::Tuple {
272            remote_address,
273            local_address,
274        };
275        let mut message = (tuple, [1u8, 2, 3]);
276
277        let mut buffer = [0u8; 10];
278
279        assert_eq!(*message.path_handle(), tuple);
280        assert_eq!(message.ecn(), Default::default());
281        assert_eq!(message.delay(), Default::default());
282        assert_eq!(message.ipv6_flow_label(), 0);
283        assert_eq!(
284            message.write_payload(PayloadBuffer::new(&mut buffer[..]), 0),
285            Ok(3)
286        );
287    }
288
289    #[test]
290    #[should_panic]
291    fn message_tuple_undersized_test() {
292        let remote_address = SocketAddressV4::new([127, 0, 0, 1], 80).into();
293        let local_address = SocketAddressV4::new([192, 168, 0, 1], 3000).into();
294        let tuple = path::Tuple {
295            remote_address,
296            local_address,
297        };
298        let mut message = (tuple, [1u8, 2, 3]);
299
300        // assert an undersized buffer panics in debug
301        let _ = message.write_payload(PayloadBuffer::new(&mut [][..]), 0);
302    }
303}