use std::collections::VecDeque;
use crate::core::error::BluefinError;
use super::common::BluefinResult;
pub const MAX_SLIDING_WINDOW_SIZE: usize = 20000;
#[derive(Clone)]
pub(crate) struct SlidingWindow {
smallest_expected_packet_number: u64,
ordered_packet_numbers: VecDeque<u64>,
}
#[derive(PartialEq, Debug)]
pub(crate) struct SlidingWindowConsumeResult {
pub(crate) largest_packet_number: u64,
pub(crate) num_acks_consumed: u64,
}
impl SlidingWindow {
pub(crate) fn new(smallest_expected_packet_number: u64) -> Self {
Self {
smallest_expected_packet_number,
ordered_packet_numbers: VecDeque::new(),
}
}
pub(crate) fn insert_packet_number(&mut self, packet_number: u64) -> BluefinResult<()> {
if packet_number < self.smallest_expected_packet_number {
return Err(BluefinError::UnexpectedPacketNumberError);
}
if packet_number - self.smallest_expected_packet_number
>= MAX_SLIDING_WINDOW_SIZE.try_into().unwrap()
{
return Err(BluefinError::BufferFullError(
"Sliding window buffer is full".to_string(),
));
}
let index = match self.ordered_packet_numbers.binary_search(&packet_number) {
Ok(_) => return Err(BluefinError::UnexpectedPacketNumberError),
Err(index) => index,
};
self.ordered_packet_numbers.insert(index, packet_number);
Ok(())
}
#[inline]
pub(crate) fn consume(&mut self) -> Option<SlidingWindowConsumeResult> {
if self.ordered_packet_numbers.is_empty() {
return None;
}
if self.ordered_packet_numbers[0] > self.smallest_expected_packet_number {
return None;
}
let mut last_packet_number = self.ordered_packet_numbers.pop_front().unwrap();
while !self.ordered_packet_numbers.is_empty() {
let p_number = self.ordered_packet_numbers.pop_front().unwrap();
if p_number == last_packet_number + 1 {
last_packet_number = p_number;
continue;
} else {
self.ordered_packet_numbers.push_front(p_number);
break;
}
}
let prev = self.smallest_expected_packet_number;
self.smallest_expected_packet_number = last_packet_number + 1;
Some(SlidingWindowConsumeResult {
largest_packet_number: last_packet_number,
num_acks_consumed: last_packet_number - prev + 1,
})
}
}
#[cfg(test)]
mod tests {
use crate::{core::error::BluefinError, utils::window::MAX_SLIDING_WINDOW_SIZE};
use super::SlidingWindow;
#[test]
fn sliding_window_behaves_as_expected() {
let mut sliding_window = SlidingWindow::new(100);
assert_eq!(sliding_window.consume(), None);
let insert_res = sliding_window.insert_packet_number(99);
assert!(insert_res.is_err());
assert_eq!(
insert_res.err().unwrap(),
BluefinError::UnexpectedPacketNumberError
);
assert_eq!(sliding_window.consume(), None);
assert_eq!(sliding_window.insert_packet_number(101), Ok(()));
assert_eq!(sliding_window.insert_packet_number(102), Ok(()));
assert_eq!(sliding_window.insert_packet_number(103), Ok(()));
assert_eq!(sliding_window.insert_packet_number(104), Ok(()));
assert_eq!(sliding_window.insert_packet_number(106), Ok(()));
assert_eq!(sliding_window.consume(), None);
let insert_res = sliding_window.insert_packet_number(103);
assert!(insert_res.is_err());
assert_eq!(
insert_res.err().unwrap(),
BluefinError::UnexpectedPacketNumberError
);
assert!(sliding_window
.insert_packet_number(100 + u64::try_from(MAX_SLIDING_WINDOW_SIZE).unwrap())
.is_err());
assert_eq!(sliding_window.insert_packet_number(100), Ok(()));
let consume_res = sliding_window.consume();
assert!(consume_res.is_some());
let consume_res_unwrapped = consume_res.unwrap();
assert_eq!(consume_res_unwrapped.largest_packet_number, 104);
assert_eq!(consume_res_unwrapped.num_acks_consumed, 5);
assert!(sliding_window.consume().is_none());
assert_eq!(sliding_window.insert_packet_number(107), Ok(()));
assert_eq!(sliding_window.insert_packet_number(110), Ok(()));
assert!(sliding_window.consume().is_none());
assert_eq!(sliding_window.insert_packet_number(105), Ok(()));
let consume_res = sliding_window.consume();
assert!(consume_res.is_some());
let consume_res_unwrapped = consume_res.unwrap();
assert_eq!(consume_res_unwrapped.largest_packet_number, 107);
assert_eq!(consume_res_unwrapped.num_acks_consumed, 3);
assert!(sliding_window.consume().is_none());
assert!(sliding_window
.insert_packet_number(108 + u64::try_from(MAX_SLIDING_WINDOW_SIZE).unwrap())
.is_err());
assert!(sliding_window
.insert_packet_number(107 + u64::try_from(MAX_SLIDING_WINDOW_SIZE).unwrap())
.is_ok());
}
}