use std::collections::VecDeque;
use log::warn;
use naia_shared::{
sequence_greater_than, sequence_less_than, wrapping_diff, BitWrite, BitWriter,
LocalEntityAndGlobalEntityConverterMut, MessageContainer, MessageKinds, Serde,
ShortMessageIndex, Tick, TickBufferSettings, UnsignedVariableInteger,
};
pub struct ChannelTickBufferSender {
sending_messages: OutgoingMessages,
outgoing_messages: VecDeque<(Tick, Vec<(ShortMessageIndex, MessageContainer)>)>,
last_sent: Tick,
never_sent: bool,
}
impl ChannelTickBufferSender {
pub fn new(settings: TickBufferSettings) -> Self {
Self {
sending_messages: OutgoingMessages::new(settings.message_capacity),
outgoing_messages: VecDeque::new(),
last_sent: 0,
never_sent: true,
}
}
pub fn collect_messages(&mut self, client_sending_tick: &Tick, server_receivable_tick: &Tick) {
if sequence_greater_than(*client_sending_tick, self.last_sent) || self.never_sent {
self.sending_messages
.pop_back_until_excluding(server_receivable_tick);
self.last_sent = *client_sending_tick;
self.never_sent = true;
for (message_tick, message_map) in self.sending_messages.iter() {
if sequence_greater_than(*message_tick, *client_sending_tick) {
warn!("Sending message that is more recent than client sending tick! This shouldn't be possible.");
break;
}
let messages = message_map.collect_messages();
self.outgoing_messages.push_back((*message_tick, messages));
}
}
}
pub fn send_message(&mut self, host_tick: &Tick, message: MessageContainer) {
self.sending_messages.push(*host_tick, message);
}
pub fn has_messages(&self) -> bool {
!self.outgoing_messages.is_empty()
}
pub fn write_messages(
&mut self,
message_kinds: &MessageKinds,
converter: &mut dyn LocalEntityAndGlobalEntityConverterMut,
writer: &mut BitWriter,
host_tick: &Tick,
has_written: &mut bool,
) -> Option<Vec<(Tick, ShortMessageIndex)>> {
let mut last_written_tick = *host_tick;
let mut output = Vec::new();
loop {
if self.outgoing_messages.is_empty() {
break;
}
let (message_tick, messages) = self.outgoing_messages.front().unwrap();
let mut counter = writer.counter();
true.ser(&mut counter);
self.write_message(
message_kinds,
converter,
&mut counter,
&last_written_tick,
message_tick,
messages,
);
if counter.overflowed() {
if !*has_written {
self.warn_overflow(messages, counter.bits_needed(), writer.bits_free());
}
break;
}
*has_written = true;
true.ser(writer);
let message_indices = self.write_message(
message_kinds,
converter,
writer,
&last_written_tick,
&message_tick,
&messages,
);
last_written_tick = *message_tick;
for message_index in message_indices {
output.push((*message_tick, message_index));
}
self.outgoing_messages.pop_front();
}
Some(output)
}
fn write_message(
&self,
message_kinds: &MessageKinds,
converter: &mut dyn LocalEntityAndGlobalEntityConverterMut,
writer: &mut dyn BitWrite,
last_written_tick: &Tick,
message_tick: &Tick,
messages: &Vec<(ShortMessageIndex, MessageContainer)>,
) -> Vec<ShortMessageIndex> {
let mut message_indices = Vec::new();
let message_tick_diff = wrapping_diff(*message_tick, *last_written_tick);
let message_tick_diff_encoded = UnsignedVariableInteger::<3>::new(message_tick_diff);
message_tick_diff_encoded.ser(writer);
let message_count = UnsignedVariableInteger::<3>::new(messages.len() as u64);
message_count.ser(writer);
let mut last_id_written: ShortMessageIndex = 0;
for (message_index, message) in messages {
let id_diff = UnsignedVariableInteger::<2>::new(*message_index - last_id_written);
id_diff.ser(writer);
message.write(message_kinds, writer, converter);
message_indices.push(*message_index);
last_id_written = *message_index;
}
message_indices
}
pub fn notify_message_delivered(&mut self, tick: &Tick, message_index: &ShortMessageIndex) {
self.sending_messages.remove_message(tick, message_index);
}
fn warn_overflow(
&self,
messages: &Vec<(ShortMessageIndex, MessageContainer)>,
bits_needed: u32,
bits_free: u32,
) {
let mut message_names = "".to_string();
let mut added = false;
for (_id, message) in messages {
if added {
message_names.push(',');
} else {
added = true;
}
message_names.push_str(&message.name());
}
panic!(
"Packet Write Error: Blocking overflow detected! Messages of type `{message_names}` requires {bits_needed} bits, but packet only has {bits_free} bits available! This condition should never be reached, as large Messages should be Fragmented in the Reliable channel"
)
}
}
struct MessageMap {
list: Vec<Option<MessageContainer>>,
}
impl MessageMap {
pub fn new() -> Self {
MessageMap { list: Vec::new() }
}
pub fn insert(&mut self, message: MessageContainer) {
self.list.push(Some(message));
}
pub fn collect_messages(&self) -> Vec<(ShortMessageIndex, MessageContainer)> {
let mut output = Vec::new();
for (index, message_opt) in self.list.iter().enumerate() {
if let Some(message) = message_opt {
output.push((index as u8, message.clone()));
}
}
output
}
pub fn remove(&mut self, message_index: &ShortMessageIndex) {
if let Some(container) = self.list.get_mut(*message_index as usize) {
*container = None;
}
}
pub fn len(&self) -> usize {
self.list.len()
}
}
struct OutgoingMessages {
buffer: VecDeque<(Tick, MessageMap)>,
capacity: usize,
}
impl OutgoingMessages {
pub fn new(capacity: usize) -> Self {
OutgoingMessages {
buffer: VecDeque::new(),
capacity,
}
}
pub fn push(&mut self, message_tick: Tick, message: MessageContainer) {
if let Some((front_tick, msg_map)) = self.buffer.front_mut() {
if message_tick == *front_tick {
msg_map.insert(message);
return;
}
if sequence_less_than(message_tick, *front_tick) {
warn!("This method should always receive increasing or equal Ticks! \
Received Tick: {message_tick} after receiving {front_tick}. \
Possibly try ensuring that Client.send_message() is only called on this channel once per Tick?");
return;
}
} else {
}
let mut msg_map = MessageMap::new();
msg_map.insert(message);
self.buffer.push_front((message_tick, msg_map));
while self.buffer.len() > self.capacity {
self.buffer.pop_back();
}
}
pub fn pop_back_until_excluding(&mut self, until_tick: &Tick) {
loop {
if let Some((old_tick, _)) = self.buffer.back() {
if sequence_less_than(*until_tick, *old_tick) {
return;
}
} else {
return;
}
self.buffer.pop_back();
}
}
pub fn remove_message(&mut self, tick: &Tick, message_index: &ShortMessageIndex) {
let mut index = self.buffer.len();
if index == 0 {
return;
}
loop {
index -= 1;
let mut remove = false;
if let Some((old_tick, message_map)) = self.buffer.get_mut(index) {
if *old_tick == *tick {
message_map.remove(message_index);
if message_map.len() == 0 {
remove = true;
}
} else {
if sequence_greater_than(*old_tick, *tick) {
return;
}
}
}
if remove {
self.buffer.remove(index);
}
if index == 0 {
return;
}
}
}
pub fn iter(&self) -> impl Iterator<Item = &(Tick, MessageMap)> {
self.buffer.iter()
}
}