use super::FragmentRc;
use super::FragmentRef;
use std::collections::VecDeque;
use std::sync::Arc;
struct TxEntry {
fragment: FragmentRc,
expire_time_ms: u64,
}
pub struct TxBuffer {
fragments: VecDeque<TxEntry>,
next_fragment_id: u32,
fragment_size: usize,
}
impl TxBuffer {
pub fn new(base_id: u32, fragment_size: usize) -> Self {
assert!(fragment_size > 0);
Self {
fragments: VecDeque::new(),
next_fragment_id: base_id,
fragment_size,
}
}
pub fn push(&mut self, packet: Box<[u8]>, expire_time_ms: u64) {
let packet_len = packet.len();
let mut bytes_remaining = packet.len();
let mut index = 0;
let mut first = true;
let packet_rc = Arc::new(packet);
while bytes_remaining > self.fragment_size {
let data_range = index..index + self.fragment_size;
self.fragments.push_back(TxEntry {
fragment: FragmentRc {
first,
last: false,
data: Arc::clone(&packet_rc),
data_range,
},
expire_time_ms,
});
first = false;
index += self.fragment_size;
bytes_remaining -= self.fragment_size;
}
let data_range = index..packet_len;
self.fragments.push_back(TxEntry {
fragment: FragmentRc {
first,
last: true,
data: packet_rc,
data_range,
},
expire_time_ms,
});
}
pub fn pop_expired(&mut self, now_ms: u64) {
while let Some(entry) = self.fragments.front() {
if entry.expire_time_ms <= now_ms {
self.fragments.pop_front();
} else {
break;
}
}
}
pub fn peek_sendable(&self) -> Option<(u32, &FragmentRc)> {
if let Some(entry) = self.fragments.front() {
let fragment_id = self.next_fragment_id;
return Some((fragment_id, &entry.fragment));
}
None
}
pub fn pop_sendable(&mut self) -> Option<(u32, FragmentRc)> {
if let Some(entry) = self.fragments.pop_front() {
let fragment_id = self.next_fragment_id;
self.next_fragment_id = self.next_fragment_id.wrapping_add(1);
return Some((fragment_id, entry.fragment));
}
None
}
}
pub struct PacketBuild {
buffer: Vec<u8>,
}
pub struct RxBuffer {
fragment_size: usize,
base_id: u32,
current_build: Option<PacketBuild>,
}
impl RxBuffer {
pub fn new(base_id: u32, fragment_size: usize) -> Self {
assert!(fragment_size > 0);
Self {
fragment_size,
base_id,
current_build: None,
}
}
fn fragment_is_valid(fragment: &FragmentRef, fragment_size: usize) -> bool {
if fragment.last && fragment.data.len() > fragment_size {
return false;
}
if !fragment.last && fragment.data.len() != fragment_size {
return false;
}
true
}
pub fn receive(&mut self, fragment_id: u32, fragment: &FragmentRef) -> Option<Box<[u8]>> {
if Self::fragment_is_valid(fragment, self.fragment_size) {
if fragment.first {
if fragment.last {
self.current_build = None;
self.base_id = fragment_id.wrapping_add(1);
return Some(fragment.data.into());
} else {
let mut buffer = Vec::new();
buffer.extend_from_slice(fragment.data);
self.current_build = Some(PacketBuild { buffer });
self.base_id = fragment_id.wrapping_add(1);
return None;
}
} else if fragment_id == self.base_id {
if let Some(ref mut current_build) = self.current_build {
current_build.buffer.extend_from_slice(fragment.data);
self.base_id = fragment_id.wrapping_add(1);
if fragment.last {
let current_build = self.current_build.take().unwrap();
return Some(current_build.buffer.into_boxed_slice());
}
} else {
}
}
}
None
}
pub fn reset(&mut self) {
self.current_build = None;
}
}
#[cfg(test)]
mod tests {
use std::ops::Range;
use super::*;
fn peek_and_pop_sendable(send_buf: &mut TxBuffer) -> Option<(u32, FragmentRc)> {
let peek_result = match send_buf.peek_sendable() {
Some(result) => Some((result.0, result.1.clone())),
None => None,
};
let pop_result = send_buf.pop_sendable();
assert_eq!(peek_result, pop_result);
return pop_result;
}
fn expect_pop_fail(send_buf: &mut TxBuffer) {
assert_eq!(peek_and_pop_sendable(send_buf), None);
}
fn fragmentation_trial(
initial_base_id: u32,
fragment_size: usize,
packet_data: &Box<[u8]>,
ranges: &[Range<usize>],
) {
let mut send_buf = TxBuffer::new(initial_base_id, fragment_size);
send_buf.push(packet_data.clone(), u64::MAX);
let packet_data_rc = Arc::new(packet_data.clone());
for (idx, range) in ranges.iter().enumerate() {
let (fragment_id, fragment) = peek_and_pop_sendable(&mut send_buf).unwrap();
assert_eq!(fragment_id, initial_base_id.wrapping_add(idx as u32));
assert_eq!(
fragment,
FragmentRc {
first: idx == 0,
last: idx == (ranges.len() - 1),
data: packet_data_rc.clone(),
data_range: range.clone(),
}
);
}
expect_pop_fail(&mut send_buf);
}
#[test]
fn send_fragmentation() {
struct Trial {
packet_data: Box<[u8]>,
ranges: Vec<Range<usize>>,
}
let trials = vec![
Trial {
packet_data: vec![].into(),
ranges: vec![0..0],
},
Trial {
packet_data: vec![0].into(),
ranges: vec![0..1],
},
Trial {
packet_data: vec![0, 1].into(),
ranges: vec![0..2],
},
Trial {
packet_data: vec![0, 1, 2].into(),
ranges: vec![0..3],
},
Trial {
packet_data: vec![0, 1, 2, 3].into(),
ranges: vec![0..4],
},
Trial {
packet_data: vec![0, 1, 2, 3, 4].into(),
ranges: vec![0..4, 4..5],
},
Trial {
packet_data: vec![0, 1, 2, 3, 4, 5].into(),
ranges: vec![0..4, 4..6],
},
Trial {
packet_data: vec![0, 1, 2, 3, 4, 5, 6].into(),
ranges: vec![0..4, 4..7],
},
Trial {
packet_data: vec![0, 1, 2, 3, 4, 5, 6, 7].into(),
ranges: vec![0..4, 4..8],
},
Trial {
packet_data: vec![0, 1, 2, 3, 4, 5, 6, 7, 8].into(),
ranges: vec![0..4, 4..8, 8..9],
},
];
const FRAGMENT_SIZE: usize = 4;
const ID_SWEEP_SIZE: u32 = 4;
let mut initial_base_id = 0_u32.wrapping_sub(ID_SWEEP_SIZE);
for _ in 0..=ID_SWEEP_SIZE {
for trial in trials.iter() {
fragmentation_trial(
initial_base_id,
FRAGMENT_SIZE,
&trial.packet_data,
&trial.ranges,
)
}
initial_base_id = initial_base_id.wrapping_add(1);
}
}
}