use crate::{event, inet::ExplicitCongestionNotification, path};
use core::{
task::{Context, Poll},
time::Duration,
};
pub mod handle_map;
pub mod router;
pub trait Tx: Sized {
type PathHandle;
type Queue: Queue<Handle = Self::PathHandle>;
type Error;
#[inline]
fn ready(&mut self) -> TxReady<'_, Self> {
TxReady(self)
}
fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Self::Error>>;
fn queue<F: FnOnce(&mut Self::Queue)>(&mut self, f: F);
fn handle_error<E: event::EndpointPublisher>(self, error: Self::Error, event: &mut E);
}
impl_ready_future!(Tx, TxReady, Result<(), T::Error>);
pub trait TxExt: Tx {
#[inline]
fn with_router<Router, Other>(
self,
router: Router,
other: Other,
) -> router::Channel<Router, Self, Other>
where
Router: router::Router,
Other: Tx,
{
router::Channel {
router,
a: self,
b: other,
}
}
#[inline]
fn with_handle_map<Map, Handle>(self, map: Map) -> handle_map::Channel<Map, Self, Handle>
where
Map: Fn(&Handle) -> Self::PathHandle,
{
handle_map::Channel {
map,
tx: self,
handle: Default::default(),
}
}
}
impl<T: Tx> TxExt for T {}
pub trait Queue {
type Handle: path::Handle;
const SUPPORTS_ECN: bool = false;
const SUPPORTS_PACING: bool = false;
const SUPPORTS_FLOW_LABELS: bool = false;
fn push<M: Message<Handle = Self::Handle>>(&mut self, message: M) -> Result<Outcome, Error>;
#[inline]
fn flush(&mut self) {
}
fn capacity(&self) -> usize;
#[inline]
fn has_capacity(&self) -> bool {
self.capacity() != 0
}
}
pub struct Outcome {
pub len: usize,
pub index: usize,
}
#[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Eq)]
pub enum Error {
EmptyPayload,
UndersizedBuffer,
AtCapacity,
}
pub trait Message {
type Handle: path::Handle;
fn path_handle(&self) -> &Self::Handle;
fn ecn(&mut self) -> ExplicitCongestionNotification;
fn delay(&mut self) -> Duration;
fn ipv6_flow_label(&mut self) -> u32;
fn can_gso(&self, segment_len: usize, segment_count: usize) -> bool;
fn write_payload(&mut self, buffer: PayloadBuffer, gso_offset: usize) -> Result<usize, Error>;
}
#[derive(Debug)]
pub struct PayloadBuffer<'a>(&'a mut [u8]);
impl<'a> PayloadBuffer<'a> {
#[inline]
pub fn new(bytes: &'a mut [u8]) -> Self {
Self(bytes)
}
#[inline]
pub unsafe fn into_mut_slice(self) -> &'a mut [u8] {
self.0
}
#[track_caller]
#[inline]
pub fn write(&mut self, bytes: &[u8]) -> Result<usize, Error> {
if bytes.is_empty() {
return Err(Error::EmptyPayload);
}
if let Some(buffer) = self.0.get_mut(0..bytes.len()) {
buffer.copy_from_slice(bytes);
Ok(bytes.len())
} else {
debug_assert!(
false,
"tried to write more bytes than was available in the buffer"
);
Err(Error::UndersizedBuffer)
}
}
}
impl<Handle: path::Handle, Payload: AsRef<[u8]>> Message for (Handle, Payload) {
type Handle = Handle;
fn path_handle(&self) -> &Self::Handle {
&self.0
}
fn ecn(&mut self) -> ExplicitCongestionNotification {
Default::default()
}
fn delay(&mut self) -> Duration {
Default::default()
}
fn ipv6_flow_label(&mut self) -> u32 {
0
}
fn can_gso(&self, segment_len: usize, _segment_count: usize) -> bool {
segment_len >= self.1.as_ref().len()
}
fn write_payload(
&mut self,
mut buffer: PayloadBuffer,
_gso_offset: usize,
) -> Result<usize, Error> {
buffer.write(self.1.as_ref())
}
}
impl<Handle: path::Handle, Payload: AsRef<[u8]>> Message
for (Handle, ExplicitCongestionNotification, Payload)
{
type Handle = Handle;
fn path_handle(&self) -> &Self::Handle {
&self.0
}
fn ecn(&mut self) -> ExplicitCongestionNotification {
self.1
}
fn delay(&mut self) -> Duration {
Default::default()
}
fn ipv6_flow_label(&mut self) -> u32 {
0
}
fn can_gso(&self, segment_len: usize, _segment_count: usize) -> bool {
segment_len >= self.2.as_ref().len()
}
fn write_payload(
&mut self,
mut buffer: PayloadBuffer,
_gso_offset: usize,
) -> Result<usize, Error> {
buffer.write(self.2.as_ref())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::inet::SocketAddressV4;
#[test]
fn message_tuple_test() {
let remote_address = SocketAddressV4::new([127, 0, 0, 1], 80).into();
let local_address = SocketAddressV4::new([192, 168, 0, 1], 3000).into();
let tuple = path::Tuple {
remote_address,
local_address,
};
let mut message = (tuple, [1u8, 2, 3]);
let mut buffer = [0u8; 10];
assert_eq!(*message.path_handle(), tuple);
assert_eq!(message.ecn(), Default::default());
assert_eq!(message.delay(), Default::default());
assert_eq!(message.ipv6_flow_label(), 0);
assert_eq!(
message.write_payload(PayloadBuffer::new(&mut buffer[..]), 0),
Ok(3)
);
}
#[test]
#[should_panic]
fn message_tuple_undersized_test() {
let remote_address = SocketAddressV4::new([127, 0, 0, 1], 80).into();
let local_address = SocketAddressV4::new([192, 168, 0, 1], 3000).into();
let tuple = path::Tuple {
remote_address,
local_address,
};
let mut message = (tuple, [1u8, 2, 3]);
let _ = message.write_payload(PayloadBuffer::new(&mut [][..]), 0);
}
}