use bevy::utils::HashMap;
use std::collections::VecDeque;
use bytes::Bytes;
use crate::channel::senders::fragment_ack_receiver::FragmentAckReceiver;
use crate::channel::senders::fragment_sender::FragmentSender;
use crate::channel::senders::ChannelSend;
use crate::packet::message::{FragmentData, FragmentIndex, MessageAck, MessageId, SingleData};
use crate::shared::ping::manager::PingManager;
use crate::shared::tick_manager::TickManager;
use crate::shared::time_manager::{TimeManager, WrappedTime};
use crossbeam_channel::{Receiver, Sender};
const DISCARD_AFTER: chrono::Duration = chrono::Duration::milliseconds(3000);
pub struct UnorderedUnreliableWithAcksSender {
single_messages_to_send: VecDeque<SingleData>,
fragmented_messages_to_send: VecDeque<FragmentData>,
next_send_message_id: MessageId,
fragment_sender: FragmentSender,
ack_senders: Vec<Sender<MessageId>>,
fragment_ack_receiver: FragmentAckReceiver,
current_time: WrappedTime,
}
impl UnorderedUnreliableWithAcksSender {
pub(crate) fn new() -> Self {
Self {
single_messages_to_send: VecDeque::new(),
fragmented_messages_to_send: VecDeque::new(),
next_send_message_id: MessageId::default(),
fragment_sender: FragmentSender::new(),
ack_senders: Vec::new(),
fragment_ack_receiver: FragmentAckReceiver::new(),
current_time: WrappedTime::default(),
}
}
}
impl ChannelSend for UnorderedUnreliableWithAcksSender {
fn update(&mut self, time_manager: &TimeManager, _: &PingManager, _: &TickManager) {
self.current_time = time_manager.current_time();
self.fragment_ack_receiver
.cleanup(self.current_time - DISCARD_AFTER);
}
fn buffer_send(&mut self, message: Bytes) -> Option<MessageId> {
let message_id = self.next_send_message_id;
if message.len() > self.fragment_sender.fragment_size {
let fragments = self
.fragment_sender
.build_fragments(message_id, None, message);
self.fragment_ack_receiver
.add_new_fragment_to_wait_for(message_id, fragments.len());
for fragment in fragments {
self.fragmented_messages_to_send.push_back(fragment);
}
} else {
let single_data = SingleData::new(Some(message_id), message);
self.single_messages_to_send.push_back(single_data);
}
self.next_send_message_id += 1;
Some(message_id)
}
fn send_packet(&mut self) -> (VecDeque<SingleData>, VecDeque<FragmentData>) {
(
std::mem::take(&mut self.single_messages_to_send),
std::mem::take(&mut self.fragmented_messages_to_send),
)
}
fn collect_messages_to_send(&mut self) {}
fn notify_message_delivered(&mut self, ack: &MessageAck) {
ack.fragment_id.map_or_else(
|| {
for sender in &self.ack_senders {
sender.send(ack.message_id).unwrap();
}
},
|fragment_index| {
if self.fragment_ack_receiver.receive_fragment_ack(
ack.message_id,
fragment_index,
None,
) {
for sender in &self.ack_senders {
sender.send(ack.message_id).unwrap();
}
}
},
);
}
fn has_messages_to_send(&self) -> bool {
!self.single_messages_to_send.is_empty() || !self.fragmented_messages_to_send.is_empty()
}
fn subscribe_acks(&mut self) -> Receiver<MessageId> {
let (sender, receiver) = crossbeam_channel::unbounded();
self.ack_senders.push(sender);
receiver
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::channel::senders::fragment_ack_receiver::FragmentAckTracker;
use crate::packet::packet::FRAGMENT_SIZE;
use crossbeam_channel::TryRecvError;
#[test]
fn test_receive_ack() {
let mut sender = UnorderedUnreliableWithAcksSender::new();
let receiver = sender.subscribe_acks();
let message_id = sender.buffer_send(Bytes::from("hello")).unwrap();
assert_eq!(message_id, MessageId(0));
assert_eq!(sender.next_send_message_id, MessageId(1));
sender.notify_message_delivered(&MessageAck {
message_id,
fragment_id: None,
});
assert_eq!(receiver.try_recv().unwrap(), message_id);
const NUM_BYTES: usize = (FRAGMENT_SIZE as f32 * 1.5) as usize;
let bytes = Bytes::from(vec![0; NUM_BYTES]);
let message_id = sender.buffer_send(bytes).unwrap();
assert_eq!(message_id, MessageId(1));
let mut expected = FragmentAckReceiver::new();
expected.add_new_fragment_to_wait_for(message_id, 2);
assert_eq!(&sender.fragment_ack_receiver, &expected);
sender.notify_message_delivered(&MessageAck {
message_id,
fragment_id: Some(0),
});
assert!(receiver.try_recv().unwrap_err().is_empty());
sender.notify_message_delivered(&MessageAck {
message_id,
fragment_id: Some(1),
});
assert_eq!(receiver.try_recv().unwrap(), message_id);
}
}