mod behavior;
mod segment;
mod slice;
pub use behavior::Behavior;
pub(crate) use segment::Segment;
pub use slice::Slice;
pub type Free<'a, M> = Slice<'a, M, behavior::Free>;
pub type Occupied<'a, M> = Slice<'a, M, behavior::Occupied>;
use crate::message;
use core::fmt;
use s2n_quic_core::path::LocalAddress;
pub struct Queue<Ring: message::Ring> {
ring: Ring,
occupied: Segment,
free: Segment,
local_address: LocalAddress,
}
impl<Ring> Default for Queue<Ring>
where
Ring: message::Ring + Default,
{
fn default() -> Self {
Self::new(Ring::default())
}
}
impl<Ring: message::Ring> Queue<Ring> {
pub fn new(ring: Ring) -> Self {
let capacity = ring.len();
let occupied = Segment {
index: 0,
len: 0,
capacity,
};
let free = Segment {
index: 0,
len: capacity,
capacity,
};
Self {
ring,
occupied,
free,
local_address: Default::default(),
}
}
pub fn set_local_address(&mut self, local_address: LocalAddress) {
self.local_address = local_address;
}
pub fn mtu(&self) -> usize {
self.ring.mtu()
}
pub fn max_gso(&self) -> usize {
self.ring.max_gso()
}
pub fn disable_gso(&mut self) {
self.ring.disable_gso()
}
pub fn capacity(&self) -> usize {
self.ring.len()
}
pub fn free_len(&self) -> usize {
self.free.len
}
pub fn occupied_len(&self) -> usize {
self.occupied.len
}
pub fn free_mut(&mut self) -> Free<Ring::Message> {
let mtu = self.mtu();
let max_gso = self.max_gso();
Slice {
messages: self.ring.as_mut_slice(),
primary: &mut self.free,
secondary: &mut self.occupied,
behavior: behavior::Free { mtu },
max_gso,
gso_segment: None,
local_address: &self.local_address,
}
}
pub fn occupied_mut(&mut self) -> Occupied<Ring::Message> {
let mtu = self.mtu();
let max_gso = self.max_gso();
Slice {
messages: self.ring.as_mut_slice(),
primary: &mut self.occupied,
secondary: &mut self.free,
behavior: behavior::Occupied { mtu },
max_gso,
gso_segment: None,
local_address: &self.local_address,
}
}
}
impl<Ring: message::Ring> fmt::Debug for Queue<Ring> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Queue")
.field("free_index", &self.free.index)
.field("free_len", &self.free.len)
.field("occupied_index", &self.occupied.index)
.field("occupied_len", &self.occupied.len)
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{buffer::VecBuffer, message::Message};
use bolero::{check, generator::*};
use s2n_quic_core::inet;
use std::collections::VecDeque;
const MTU: usize = 1200;
fn set<M: Message>(message: &mut M, value: u8, len: usize) {
assert_eq!(
message.payload_len(),
MTU,
"payload len should be reset for free messages"
);
unsafe {
message.set_payload_len(len);
}
for b in message.payload_mut().iter_mut() {
*b = value;
}
}
fn gen_address() -> impl ValueGenerator<Output = inet::SocketAddress> {
gen()
}
#[derive(Clone, Copy, Debug, TypeGenerator)]
enum Operation {
Push {
#[generator(0..20)]
count: usize,
#[generator(1..32)]
len: usize,
#[generator(gen_address())]
address: inet::SocketAddress,
success: bool,
},
Pop {
#[generator(0..20)]
count: usize,
success: bool,
},
}
fn check<R: message::Ring>(mut queue: Queue<R>, capacity: usize, ops: &[Operation]) {
let mut oracle = VecDeque::new();
let mut value = 0u8;
for op in ops {
match *op {
Operation::Push {
count,
len,
address,
success,
} => {
let mut free = queue.free_mut();
let count = count.min(free.len());
let mut payload = value;
for message in &mut free[..count] {
set(message, payload, len);
message.set_remote_address(&address);
oracle.push_back((address, len, payload));
payload = payload.wrapping_add(1);
}
if success {
value = payload;
free.finish(count);
} else {
oracle.drain((oracle.len() - count)..);
free.cancel(count);
}
}
Operation::Pop { count, success } => {
let occupied = queue.occupied_mut();
let count = count.min(occupied.len());
if success {
occupied.finish(count);
oracle.drain(..count);
} else {
occupied.cancel(count);
}
}
}
assert_eq!(capacity, queue.capacity());
assert_eq!(capacity, queue.occupied_len() + queue.free_len());
let occupied = queue.occupied_mut();
assert_eq!(oracle.len(), occupied.len());
for (message, (address, len, value)) in occupied.iter().zip(oracle.iter()) {
let address = *address;
assert_eq!(message.remote_address(), Some(address));
assert_eq!(message.payload_len(), *len);
assert!(message.payload().iter().all(|v| v == value));
}
}
}
macro_rules! differential_test {
($name:ident, $ring:path) => {
#[test]
fn $name() {
check!()
.with_generator((0usize..16, gen::<Vec<Operation>>()))
.for_each(|(capacity, ops)| {
use $ring;
let payloads = VecBuffer::new(*capacity, MTU);
let max_gso = 1;
let ring = Ring::new(payloads, max_gso);
let queue = Queue::new(ring);
assert_eq!(queue.mtu(), MTU);
check(queue, *capacity, ops);
});
}
};
}
differential_test!(simple_differential_test, message::simple::Ring);
#[cfg(s2n_quic_platform_socket_msg)]
differential_test!(msg_differential_test, message::msg::Ring);
#[cfg(s2n_quic_platform_socket_mmsg)]
differential_test!(mmsg_differential_test, message::mmsg::Ring);
}