use crate::*;
use enum_dispatch::*;
#[enum_dispatch]
pub(crate) trait ChannelT {
fn id(&self) -> u8;
fn update(&mut self, time: f64);
fn any_ready_to_send(&self) -> bool;
fn enqueue_message(
&mut self,
pool: &BufPool,
id: MessageId,
payload: &[u8],
fragmented: Fragmented,
);
fn message_ack_received(&mut self, msg_handle: &MessageHandle);
fn get_message_to_write_to_a_packet(&mut self, max_size: usize) -> Option<Message>;
fn accepts_message(&mut self, msg: &Message) -> bool;
}
#[enum_dispatch(ChannelT)]
pub(crate) enum Channel {
UnreliableChannel,
ReliableChannel,
}
pub(crate) struct UnreliableChannel {
time: f64,
pub(crate) id: u8,
q: VecDeque<Message>,
seen_buf: SequenceBuffer<bool>,
}
impl UnreliableChannel {
pub(crate) fn new(id: u8, time: f64) -> Self {
Self {
id,
time,
q: VecDeque::default(),
seen_buf: SequenceBuffer::with_capacity(10000),
}
}
}
impl ChannelT for UnreliableChannel {
fn update(&mut self, dt: f64) {
self.time += dt;
}
fn accepts_message(&mut self, msg: &Message) -> bool {
if !self.seen_buf.check_sequence(msg.id().0) {
warn!("Rejecting too-old message on chanel {msg:?}");
return false;
}
if self.seen_buf.exists(msg.id().0) {
return false;
}
match self.seen_buf.insert(msg.id().0, true) {
Ok(_) => true,
Err(e) => {
warn!("not accepting message {msg:?} = {e:?}");
false
}
}
}
fn enqueue_message(
&mut self,
pool: &BufPool,
id: MessageId,
payload: &[u8],
fragmented: Fragmented,
) {
let msg = Message::new_outbound(pool, id, self.id(), payload, fragmented);
info!(">>>>> unreliable chan enq msg: {msg:?}");
self.q.push_back(msg);
}
fn any_ready_to_send(&self) -> bool {
!self.q.is_empty()
}
fn id(&self) -> u8 {
self.id
}
fn message_ack_received(&mut self, _handle: &MessageHandle) {}
fn get_message_to_write_to_a_packet(&mut self, max_size: usize) -> Option<Message> {
for index in 0..self.q.len() {
if self.q[index].size() > max_size {
continue;
}
return self.q.remove(index);
}
None
}
}
struct ResendableMessage {
message: Message,
last_sent: Option<f64>,
}
impl ResendableMessage {
fn new(message: Message) -> Self {
Self {
message,
last_sent: None,
}
}
fn is_ready(&self, now: f64, cutoff: f64) -> bool {
if self.last_sent.is_none() || (now - self.last_sent.unwrap()) >= cutoff {
return true;
}
false
}
fn dismissed_by_ack(&self, acked_handle: &MessageHandle) -> bool {
if self.message.id() != acked_handle.id() {
return false;
}
if let Some(fragment) = self.message.fragment() {
Some(fragment.parent_id) == acked_handle.parent_id()
} else {
assert!(acked_handle.frag_index.is_none());
true
}
}
}
pub(crate) struct ReliableChannel {
time: f64,
pub(crate) id: u8,
q: VecDeque<ResendableMessage>,
resend_time: f64,
seen_buf: SequenceBuffer<bool>,
}
impl ReliableChannel {
pub(crate) fn new(id: u8, time: f64) -> Self {
Self {
id,
time,
q: VecDeque::default(),
resend_time: 0.1,
seen_buf: SequenceBuffer::with_capacity(10000),
}
}
}
impl ChannelT for ReliableChannel {
fn update(&mut self, dt: f64) {
self.time += dt;
}
fn accepts_message(&mut self, msg: &Message) -> bool {
if !self.seen_buf.check_sequence(msg.id().0) {
warn!("Rejecting too-old message on chanel {msg:?}");
return false;
}
if self.seen_buf.exists(msg.id().0) {
return false;
}
match self.seen_buf.insert(msg.id().0, true) {
Ok(_) => true,
Err(e) => {
warn!("not accepting message {msg:?} = {e:?}");
false
}
}
}
fn enqueue_message(
&mut self,
pool: &BufPool,
id: MessageId,
payload: &[u8],
fragmented: Fragmented,
) {
let msg = Message::new_outbound(pool, id, self.id(), payload, fragmented);
self.q.push_back(ResendableMessage::new(msg));
}
fn any_ready_to_send(&self) -> bool {
let now = self.time;
let cutoff = self.resend_time;
self.q.iter().any(|m| m.is_ready(now, cutoff))
}
fn id(&self) -> u8 {
self.id
}
fn message_ack_received(&mut self, msg_handle: &MessageHandle) {
self.q.retain(|m| !m.dismissed_by_ack(msg_handle));
}
fn get_message_to_write_to_a_packet(&mut self, max_size: usize) -> Option<Message> {
for index in 0..self.q.len() {
let re_msg = self.q.get_mut(index).unwrap();
if re_msg.message.size() > max_size {
continue;
}
if re_msg.is_ready(self.time, self.resend_time) {
if re_msg.last_sent.is_some() {
info!("resending.. {:?}", re_msg.message.fragment());
}
re_msg.last_sent = Some(self.time);
return Some(re_msg.message.clone());
}
}
None
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn unreliable_channel() {
crate::test_utils::init_logger();
let pool = BufPool::empty();
let mut channel = UnreliableChannel::new(0, 1.0);
let payload = b"hello";
channel.enqueue_message(&pool, MessageId(0), payload, Fragmented::No);
assert!(channel.any_ready_to_send());
assert!(channel.get_message_to_write_to_a_packet(999999).is_some());
channel.update(1.0);
assert!(!channel.any_ready_to_send());
}
#[test]
fn reliable_channel() {
crate::test_utils::init_logger();
let pool = BufPool::empty();
let channel_id = 0;
let mut channel = ReliableChannel::new(channel_id, 1.0);
let payload = b"hello";
let message_id = MessageId(123);
channel.enqueue_message(&pool, message_id, payload, Fragmented::No);
assert!(channel.any_ready_to_send());
assert!(channel.get_message_to_write_to_a_packet(999999).is_some());
assert!(!channel.any_ready_to_send());
channel.update(1.0);
assert!(channel.any_ready_to_send());
assert!(channel.get_message_to_write_to_a_packet(999999).is_some());
let handle = MessageHandle {
id: message_id,
frag_index: None,
channel: channel_id,
};
channel.message_ack_received(&handle);
channel.update(1.0);
assert!(!channel.any_ready_to_send());
}
}
#[derive(Default)]
pub(crate) struct ChannelList {
channels: smallmap::Map<u8, Channel>,
}
impl ChannelList {
pub(crate) fn get_mut(&mut self, id: u8) -> Option<&mut Channel> {
self.channels.get_mut(&id)
}
pub(crate) fn insert(&mut self, channel: Channel) {
assert!(
(channel.id() as usize) < MAX_CHANNELS,
"channel.id exceeds max"
);
self.channels.insert(channel.id(), channel);
}
pub(crate) fn all_mut(&mut self) -> impl Iterator<Item = &mut Channel> {
self.channels.values_mut()
}
pub(crate) fn all_non_empty_mut(&mut self) -> impl Iterator<Item = &mut Channel> {
self.channels.values_mut().filter(|c| c.any_ready_to_send())
}
pub(crate) fn any_with_messages_to_send(&self) -> bool {
self.channels.values().any(|ch| ch.any_ready_to_send())
}
}