pub(crate) mod recv;
pub(crate) mod send;
pub use self::recv::*;
pub use self::send::*;
use std::collections::BTreeMap;
use std::collections::HashMap;
use crate::protocol::frame::FragmentMeta;
use crate::protocol::frame::Frame;
use crate::protocol::reliability::Reliability;
use crate::protocol::RAKNET_HEADER_FRAME_OVERHEAD;
use crate::server::current_epoch;
#[derive(Debug, Clone)]
pub enum NetQueueError<E> {
InvalidInsertion,
InvalidInsertionKnown(String),
ItemDeletionFail,
InvalidItem,
EmptyQueue,
Other(E),
}
pub trait NetQueue<Item> {
type KeyId;
type Error;
fn insert(&mut self, item: Item) -> Result<Self::KeyId, NetQueueError<Self::Error>>;
fn remove(&mut self, key: Self::KeyId) -> Result<Item, NetQueueError<Self::Error>>;
fn get(&mut self, key: Self::KeyId) -> Result<&Item, NetQueueError<Self::Error>>;
fn flush(&mut self) -> Result<Vec<Item>, NetQueueError<Self::Error>>;
}
#[derive(Debug, Clone)]
pub struct RecoveryQueue<Item> {
queue: HashMap<u32, (u64, Item)>,
}
impl<Item> RecoveryQueue<Item>
where
Item: Clone,
{
pub fn new() -> Self {
Self {
queue: HashMap::new(),
}
}
pub fn insert_id(&mut self, seq: u32, item: Item) {
self.queue.insert(seq, (current_epoch(), item));
}
pub fn get_all(&mut self) -> Vec<(u32, Item)> {
self.queue
.iter()
.map(|(seq, (_, item))| (*seq, item.clone()))
.collect::<Vec<_>>()
}
pub fn flush_old(&mut self, threshold: u64) -> Vec<Item> {
let old = self
.queue
.iter()
.filter(|(_, (time, _))| (*time + threshold) < current_epoch())
.map(|(_, (_, item))| item.clone())
.collect::<Vec<_>>();
self.queue
.retain(|_, (time, _)| (*time + threshold) > current_epoch());
old
}
}
impl<Item> NetQueue<Item> for RecoveryQueue<Item> {
type KeyId = u32;
type Error = ();
fn insert(&mut self, item: Item) -> Result<Self::KeyId, NetQueueError<Self::Error>> {
let index = self.queue.len() as u32;
self.queue.insert(index, (current_epoch(), item));
Ok(index)
}
fn remove(&mut self, key: Self::KeyId) -> Result<Item, NetQueueError<Self::Error>> {
if let Some((_, item)) = self.queue.remove(&key) {
Ok(item)
} else {
Err(NetQueueError::ItemDeletionFail)
}
}
fn get(&mut self, key: Self::KeyId) -> Result<&Item, NetQueueError<Self::Error>> {
if let Some((_, item)) = self.queue.get(&key) {
Ok(item)
} else {
Err(NetQueueError::ItemDeletionFail)
}
}
fn flush(&mut self) -> Result<Vec<Item>, NetQueueError<Self::Error>> {
let mut items = Vec::new();
for (_, (_, item)) in self.queue.drain() {
items.push(item);
}
Ok(items)
}
}
#[derive(Debug, Clone)]
pub struct OrderedQueue<Item: Clone + std::fmt::Debug> {
pub queue: BTreeMap<u32, Item>,
pub window: (u32, u32),
}
impl<Item> OrderedQueue<Item>
where
Item: Clone + std::fmt::Debug,
{
pub fn new() -> Self {
Self {
queue: BTreeMap::new(),
window: (0, 0),
}
}
pub fn next(&mut self) -> u32 {
self.window.0 = self.window.0.wrapping_add(1);
return self.window.0;
}
pub fn insert(&mut self, index: u32, item: Item) -> bool {
if index < self.window.0 {
return false;
}
if self.queue.contains_key(&index) {
return false;
}
if index >= self.window.1 {
self.window.1 = index + 1;
}
self.queue.insert(index, item);
true
}
pub fn insert_abs(&mut self, index: u32, item: Item) {
if index >= self.window.1 {
self.window.1 = index + 1;
}
self.queue.insert(index, item);
}
pub fn missing(&self) -> Vec<u32> {
let mut missing = Vec::new();
for i in self.window.0..self.window.1 {
if !self.queue.contains_key(&i) {
missing.push(i);
}
}
missing
}
pub fn flush(&mut self) -> Vec<Item> {
let mut items = Vec::<(u32, Item)>::new();
while self.queue.contains_key(&self.window.0) {
if let Some(item) = self.queue.remove(&self.window.0) {
items.push((self.window.0, item));
} else {
break;
}
self.window.0 = self.window.0.wrapping_add(1);
}
items.sort_by(|a, b| a.0.cmp(&b.0));
return items
.iter()
.map(|(_, item)| item.clone())
.collect::<Vec<Item>>();
}
}
#[derive(Clone, Debug)]
pub struct FragmentQueue {
fragment_id: u16,
fragments: HashMap<u16, (u32, Vec<Frame>)>,
}
impl FragmentQueue {
pub fn new() -> Self {
Self {
fragment_id: 0,
fragments: HashMap::new(),
}
}
pub fn insert(&mut self, fragment: Frame) -> Result<(u32, u32), FragmentQueueError> {
if let Some(meta) = fragment.fragment_meta.clone() {
if let Some((size, frames)) = self.fragments.get_mut(&meta.id) {
if meta.index >= *size {
return Err(FragmentQueueError::FrameIndexOutOfBounds);
}
if let Some(_) = frames
.iter()
.find(|&f| f.fragment_meta.as_ref().unwrap().index == meta.index)
{
return Err(FragmentQueueError::FrameExists);
} else {
frames.push(fragment);
return Ok((meta.size, meta.index));
}
} else {
let (size, mut frames) = (meta.size, Vec::<Frame>::new());
frames.push(fragment);
self.fragments.insert(meta.id, (size, frames));
return Ok((meta.size, meta.index));
}
}
return Err(FragmentQueueError::FrameNotFragmented);
}
pub fn collect(&mut self, id: u16) -> Result<Vec<u8>, FragmentQueueError> {
if let Some((size, frames)) = self.fragments.get_mut(&id) {
if *size == frames.len() as u32 {
frames.sort_by(|a, b| {
a.fragment_meta
.as_ref()
.unwrap()
.index
.cmp(&b.fragment_meta.as_ref().unwrap().index)
});
let mut buffer = Vec::<u8>::new();
for frame in frames.iter() {
buffer.extend_from_slice(&frame.body);
}
self.fragments.remove(&id);
return Ok(buffer);
}
return Err(FragmentQueueError::FragmentsMissing);
}
return Err(FragmentQueueError::FragmentInvalid);
}
pub fn split_insert(&mut self, buffer: &[u8], mtu: u16) -> Result<u16, FragmentQueueError> {
self.fragment_id += self.fragment_id.wrapping_add(1);
let id = self.fragment_id;
if self.fragments.contains_key(&id) {
self.fragments.remove(&id);
}
if let Ok(frames) = Self::split(buffer, id, mtu) {
self.fragments.insert(id, (frames.len() as u32, frames));
return Ok(id);
}
return Err(FragmentQueueError::DoesNotNeedSplit);
}
pub fn split(buffer: &[u8], id: u16, mtu: u16) -> Result<Vec<Frame>, FragmentQueueError> {
let max_mtu = mtu - RAKNET_HEADER_FRAME_OVERHEAD;
if buffer.len() > max_mtu.into() {
let splits = buffer
.chunks(max_mtu.into())
.map(|c| c.to_vec())
.collect::<Vec<Vec<u8>>>();
let mut frames: Vec<Frame> = Vec::new();
let mut index: u32 = 0;
for buf in splits.iter() {
let mut f = Frame::new(Reliability::ReliableOrd, Some(&buf[..]));
f.fragment_meta = Some(FragmentMeta {
index,
size: splits.len() as u32,
id,
});
index += 1;
frames.push(f);
}
return Ok(frames);
}
return Err(FragmentQueueError::DoesNotNeedSplit);
}
pub fn get(&self, id: &u16) -> Result<&(u32, Vec<Frame>), FragmentQueueError> {
if let Some(v) = self.fragments.get(id) {
return Ok(v);
}
return Err(FragmentQueueError::FragmentInvalid);
}
pub fn get_mut(&mut self, id: &u16) -> Result<&mut (u32, Vec<Frame>), FragmentQueueError> {
if let Some(v) = self.fragments.get_mut(id) {
return Ok(v);
}
return Err(FragmentQueueError::FragmentInvalid);
}
pub fn remove(&mut self, id: &u16) -> bool {
self.fragments.remove(id).is_some()
}
pub fn clear(&mut self) {
self.fragment_id = 0;
self.fragments.clear();
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub enum FragmentQueueError {
FrameExists,
FrameNotFragmented,
DoesNotNeedSplit,
FragmentInvalid,
FragmentsMissing,
FrameIndexOutOfBounds,
}