use super::{Behavior, Segment};
use crate::message;
use core::ops::{Deref, DerefMut};
use s2n_quic_core::{
io::{rx, tx},
path::{self, LocalAddress},
};
#[derive(Debug)]
pub struct Slice<'a, Message: message::Message, Behavior> {
pub(crate) messages: &'a mut [Message],
pub(crate) primary: &'a mut Segment,
pub(crate) secondary: &'a mut Segment,
pub(crate) behavior: Behavior,
pub(crate) max_gso: usize,
pub(crate) gso_segment: Option<GsoSegment>,
pub(crate) local_address: &'a LocalAddress,
}
#[derive(Debug, Default)]
pub struct GsoSegment {
index: usize,
count: usize,
size: usize,
}
impl<'a, Message: message::Message, B: Behavior> Slice<'a, Message, B> {
#[inline]
pub fn finish(mut self, count: usize) {
self.advance(count);
}
fn advance(&mut self, count: usize) {
debug_assert!(
count <= self.len(),
"cannot finish more messages than available"
);
self.flush_gso();
let (start, end, overflow, capacity) = self.compute_behavior_arguments(count);
let (primary, secondary) = self.messages.split_at_mut(capacity);
self.behavior
.advance(primary, secondary, start, end, overflow);
self.primary.move_into(self.secondary, count);
}
pub fn cancel(mut self, count: usize) {
self.flush_gso();
let (start, end, overflow, capacity) = self.compute_behavior_arguments(count);
let (primary, secondary) = self.messages.split_at_mut(capacity);
self.behavior
.cancel(primary, secondary, start, end, overflow);
}
#[inline]
fn compute_behavior_arguments(&self, count: usize) -> (usize, usize, usize, usize) {
let capacity = self.primary.capacity;
let prev_index = self.primary.index;
let new_index = prev_index + count;
let start = prev_index;
let end = new_index.min(capacity);
let overflow = new_index.saturating_sub(capacity);
(start, end, overflow, capacity)
}
}
impl<'a, Message: message::Message, B> Slice<'a, Message, B> {
#[inline]
fn flush_gso(&mut self) {
if !Message::SUPPORTS_GSO {
return;
}
if let Some(gso) = self.gso_segment.take() {
if gso.count > 1 {
let mid = self.messages.len() / 2;
let (primary, secondary) = self.messages.split_at_mut(mid);
let index = gso.index;
let (primary, secondary) = if let Some(index) = index.checked_sub(mid) {
let primary = &mut primary[index];
let secondary = &mut secondary[index];
(secondary, primary)
} else {
let primary = &mut primary[index];
let secondary = &mut secondary[index];
(primary, secondary)
};
primary.set_segment_size(gso.size);
secondary.replicate_fields_from(primary);
}
}
}
#[inline]
fn try_gso<M: tx::Message<Handle = Message::Handle>>(
&mut self,
mut message: M,
) -> Result<Result<tx::Outcome, M>, tx::Error> {
if !Message::SUPPORTS_GSO {
return Ok(Err(message));
}
let gso = if let Some(gso) = self.gso_segment.as_mut() {
gso
} else {
return Ok(Err(message));
};
let max_segments = self.max_gso;
debug_assert!(
max_segments > 1,
"gso_segment should only be set when max_gso > 1"
);
let prev_message = &mut self.messages[gso.index];
if !(message.can_gso(gso.size, gso.count) && prev_message.can_gso(&mut message)) {
self.flush_gso();
return Ok(Err(message));
}
debug_assert!(
gso.count < max_segments,
"{} cannot exceed {}",
gso.count,
max_segments
);
let payload_len = prev_message.payload_len();
unsafe {
prev_message.set_payload_len(payload_len + gso.size);
}
let buffer = &mut message::Message::payload_mut(prev_message)[payload_len..];
let buffer = tx::PayloadBuffer::new(buffer);
match message.write_payload(buffer, gso.count).and_then(|size| {
if size == 0 {
Err(tx::Error::EmptyPayload)
} else {
Ok(size)
}
}) {
Err(err) => {
unsafe {
prev_message.set_payload_len(payload_len);
}
Err(err)
}
Ok(size) => {
debug_assert_ne!(size, 0, "payloads should never be empty");
unsafe {
debug_assert!(
gso.size >= size,
"the payload tried to write more than available"
);
prev_message.set_payload_len(payload_len + size.min(gso.size));
}
gso.count += 1;
debug_assert!(
gso.count <= max_segments,
"{} cannot exceed {}",
gso.count,
max_segments
);
let index = gso.index;
let size_mismatch = gso.size != size;
let at_segment_limit = gso.count >= max_segments;
let at_payload_limit = gso.size * (gso.count + 1) > u16::MAX as usize;
if size_mismatch || at_segment_limit || at_payload_limit {
self.flush_gso();
}
Ok(Ok(tx::Outcome { len: size, index }))
}
}
}
}
impl<'a, Message: message::Message, R> Drop for Slice<'a, Message, R> {
#[inline]
fn drop(&mut self) {
self.flush_gso()
}
}
impl<'a, Message: message::Message, R> Deref for Slice<'a, Message, R> {
type Target = [Message];
#[inline]
fn deref(&self) -> &Self::Target {
&self.messages[self.primary.range()]
}
}
impl<'a, Message: message::Message, R> DerefMut for Slice<'a, Message, R> {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.messages[self.primary.range()]
}
}
impl<
'a,
Message: rx::Entry<Handle = H> + message::Message<Handle = H>,
B: Behavior,
H: path::Handle,
> rx::Queue for Slice<'a, Message, B>
{
type Entry = Message;
type Handle = H;
#[inline]
fn local_address(&self) -> LocalAddress {
*self.local_address
}
#[inline]
fn as_slice_mut(&mut self) -> &mut [Message] {
let range = self.primary.range();
&mut self.messages[range]
}
#[inline]
fn len(&self) -> usize {
self.primary.len
}
#[inline]
fn finish(&mut self, count: usize) {
self.advance(count)
}
}
impl<
'a,
Message: tx::Entry<Handle = H> + message::Message<Handle = H>,
B: Behavior,
H: path::Handle,
> tx::Queue for Slice<'a, Message, B>
{
type Entry = Message;
type Handle = H;
#[inline]
fn push<M: tx::Message<Handle = Self::Handle>>(
&mut self,
message: M,
) -> Result<tx::Outcome, tx::Error> {
let message = match self.try_gso(message)? {
Ok(outcome) => return Ok(outcome),
Err(message) => message,
};
let index = self
.primary
.index(self.secondary)
.ok_or(tx::Error::AtCapacity)?;
let size = self.messages[index].set(message)?;
self.advance(1);
if Message::SUPPORTS_GSO && self.max_gso > 1 {
self.gso_segment = Some(GsoSegment {
index,
count: 1,
size,
});
}
Ok(tx::Outcome { len: size, index })
}
#[inline]
fn as_slice_mut(&mut self) -> &mut [Message] {
&mut self.messages[self.secondary.range()]
}
#[inline]
#[allow(unknown_lints, clippy::misnamed_getters)] fn capacity(&self) -> usize {
self.primary.len
}
#[inline]
fn len(&self) -> usize {
self.secondary.len
}
}