1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
use crate::{event, inet::ExplicitCongestionNotification, path};
use core::{
task::{Context, Poll},
time::Duration,
};
pub mod handle_map;
pub mod router;
pub trait Tx: Sized {
type PathHandle;
// TODO make this generic over lifetime
// See https://github.com/aws/s2n-quic/issues/1742
type Queue: Queue<Handle = Self::PathHandle>;
type Error;
/// Returns a future that yields after a packet is ready to be transmitted
#[inline]
fn ready(&mut self) -> TxReady<Self> {
TxReady(self)
}
/// Polls the IO provider for capacity to send a packet
fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Self::Error>>;
/// Calls the provided callback with the IO provider queue
fn queue<F: FnOnce(&mut Self::Queue)>(&mut self, f: F);
/// Handles the queue error and potentially publishes an event
fn handle_error<E: event::EndpointPublisher>(self, error: Self::Error, event: &mut E);
}
impl_ready_future!(Tx, TxReady, Result<(), T::Error>);
/// Extension traits for Tx channels
pub trait TxExt: Tx {
/// Routes messages into one channel or another
#[inline]
fn with_router<Router, Other>(
self,
router: Router,
other: Other,
) -> router::Channel<Router, Self, Other>
where
Router: router::Router,
Other: Tx,
{
router::Channel {
router,
a: self,
b: other,
}
}
/// Maps one type of handle to another with a mapping function
#[inline]
fn with_handle_map<Map, Handle>(self, map: Map) -> handle_map::Channel<Map, Self, Handle>
where
Map: Fn(&Handle) -> Self::PathHandle,
{
handle_map::Channel {
map,
tx: self,
handle: Default::default(),
}
}
}
/// Implement the extension traits for all Tx queues
impl<T: Tx> TxExt for T {}
/// A structure capable of queueing and transmitting messages
pub trait Queue {
type Handle: path::Handle;
/// Set to true if the queue supports setting ECN markings
const SUPPORTS_ECN: bool = false;
/// Set to true if the queue supports pacing of sending messages
const SUPPORTS_PACING: bool = false;
/// Set to true if the queue supports setting IPv6 flow labels
const SUPPORTS_FLOW_LABELS: bool = false;
/// Pushes a message into the transmission queue
///
/// The index of the message is returned to enable further operations to be
/// performed, e.g. encryption.
fn push<M: Message<Handle = Self::Handle>>(&mut self, message: M) -> Result<Outcome, Error>;
/// Flushes any pending messages from the TX queue.
///
/// This should be called between multiple connections to ensure GSO segments aren't shared.
#[inline]
fn flush(&mut self) {
// default as no-op
}
/// Returns the number of remaining datagrams that can be transmitted
fn capacity(&self) -> usize;
/// Returns `true` if the queue will accept additional transmissions
#[inline]
fn has_capacity(&self) -> bool {
self.capacity() != 0
}
}
pub struct Outcome {
pub len: usize,
pub index: usize,
}
#[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Eq)]
pub enum Error {
/// The provided message did not write a payload
EmptyPayload,
/// The provided buffer was too small for the desired payload
UndersizedBuffer,
/// The transmission queue is at capacity
AtCapacity,
}
/// Abstraction over a message to be sent on a socket
///
/// Instead of a concrete struct with eagerly evaluated fields,
/// using trait callbacks ensure messages only need to compute what
/// the actual transmission queue requires. For example, if the transmission
/// queue cannot set ECN markings, it will not call the [`Message::ecn`] function.
pub trait Message {
type Handle: path::Handle;
/// Returns the path handle on which this message should be sent
fn path_handle(&self) -> &Self::Handle;
/// Returns the ECN markings for the message
fn ecn(&mut self) -> ExplicitCongestionNotification;
/// Returns the Duration for which the message will be delayed.
///
/// This is used in scenarios where packets need to be paced.
fn delay(&mut self) -> Duration;
/// Returns the IPv6 flow label for the message
fn ipv6_flow_label(&mut self) -> u32;
/// Returns true if the packet can be used in a GSO packet
fn can_gso(&self, segment_len: usize, segment_count: usize) -> bool;
/// Writes the payload of the message to an output buffer
fn write_payload(&mut self, buffer: PayloadBuffer, gso_offset: usize) -> Result<usize, Error>;
}
#[derive(Debug)]
pub struct PayloadBuffer<'a>(&'a mut [u8]);
impl<'a> PayloadBuffer<'a> {
#[inline]
pub fn new(bytes: &'a mut [u8]) -> Self {
Self(bytes)
}
/// # Safety
///
/// This function should only be used in the case that the writer has its own safety checks in place
#[inline]
pub unsafe fn into_mut_slice(self) -> &'a mut [u8] {
self.0
}
#[track_caller]
#[inline]
pub fn write(&mut self, bytes: &[u8]) -> Result<usize, Error> {
if bytes.is_empty() {
return Err(Error::EmptyPayload);
}
if let Some(buffer) = self.0.get_mut(0..bytes.len()) {
buffer.copy_from_slice(bytes);
Ok(bytes.len())
} else {
debug_assert!(
false,
"tried to write more bytes than was available in the buffer"
);
Err(Error::UndersizedBuffer)
}
}
}
impl<Handle: path::Handle, Payload: AsRef<[u8]>> Message for (Handle, Payload) {
type Handle = Handle;
fn path_handle(&self) -> &Self::Handle {
&self.0
}
fn ecn(&mut self) -> ExplicitCongestionNotification {
Default::default()
}
fn delay(&mut self) -> Duration {
Default::default()
}
fn ipv6_flow_label(&mut self) -> u32 {
0
}
fn can_gso(&self, segment_len: usize, _segment_count: usize) -> bool {
segment_len >= self.1.as_ref().len()
}
fn write_payload(
&mut self,
mut buffer: PayloadBuffer,
_gso_offset: usize,
) -> Result<usize, Error> {
buffer.write(self.1.as_ref())
}
}
impl<Handle: path::Handle, Payload: AsRef<[u8]>> Message
for (Handle, ExplicitCongestionNotification, Payload)
{
type Handle = Handle;
fn path_handle(&self) -> &Self::Handle {
&self.0
}
fn ecn(&mut self) -> ExplicitCongestionNotification {
self.1
}
fn delay(&mut self) -> Duration {
Default::default()
}
fn ipv6_flow_label(&mut self) -> u32 {
0
}
fn can_gso(&self, segment_len: usize, _segment_count: usize) -> bool {
segment_len >= self.2.as_ref().len()
}
fn write_payload(
&mut self,
mut buffer: PayloadBuffer,
_gso_offset: usize,
) -> Result<usize, Error> {
buffer.write(self.2.as_ref())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::inet::SocketAddressV4;
#[test]
fn message_tuple_test() {
let remote_address = SocketAddressV4::new([127, 0, 0, 1], 80).into();
let local_address = SocketAddressV4::new([192, 168, 0, 1], 3000).into();
let tuple = path::Tuple {
remote_address,
local_address,
};
let mut message = (tuple, [1u8, 2, 3]);
let mut buffer = [0u8; 10];
assert_eq!(*message.path_handle(), tuple);
assert_eq!(message.ecn(), Default::default());
assert_eq!(message.delay(), Default::default());
assert_eq!(message.ipv6_flow_label(), 0);
assert_eq!(
message.write_payload(PayloadBuffer::new(&mut buffer[..]), 0),
Ok(3)
);
}
#[test]
#[should_panic]
fn message_tuple_undersized_test() {
let remote_address = SocketAddressV4::new([127, 0, 0, 1], 80).into();
let local_address = SocketAddressV4::new([192, 168, 0, 1], 3000).into();
let tuple = path::Tuple {
remote_address,
local_address,
};
let mut message = (tuple, [1u8, 2, 3]);
// assert an undersized buffer panics in debug
let _ = message.write_payload(PayloadBuffer::new(&mut [][..]), 0);
}
}