use std::collections::HashSet;
use tracing::{debug, info, trace};
use slim_datapath::{api::ProtoMessage as Message, messages::utils::MAX_PUBLISH_ID};
pub(crate) struct ReceiverBuffer {
last_sent: usize,
first_entry: usize,
lost_msgs: HashSet<usize>,
buffer: Vec<Option<Message>>,
}
impl Default for ReceiverBuffer {
fn default() -> Self {
ReceiverBuffer {
last_sent: usize::MAX,
first_entry: 0,
lost_msgs: HashSet::new(),
buffer: vec![],
}
}
}
impl ReceiverBuffer {
const MAX_ID: usize = MAX_PUBLISH_ID as usize;
fn id_distance(from: usize, to: usize) -> usize {
if to >= from {
to - from
} else {
(Self::MAX_ID + 1 - from) + to
}
}
fn is_after(a: usize, b: usize) -> bool {
let distance = Self::id_distance(b, a);
distance > 0 && distance <= Self::MAX_ID.div_ceil(2)
}
fn add_with_wraparound(base: usize, offset: usize) -> usize {
(base + offset) % (Self::MAX_ID + 1)
}
pub fn on_received_message(&mut self, msg: Message) -> (Vec<Option<Message>>, Vec<u32>) {
self.internal_on_received_message(msg.get_id() as usize, Some(msg))
}
pub fn on_lost_message(&mut self, msg_id: u32) -> Vec<Option<Message>> {
debug!(id = %msg_id, "message definitely lost");
self.lost_msgs.insert(msg_id as usize);
self.release_msgs()
}
#[allow(dead_code)]
pub fn on_beacon_message(&mut self, msg_id: u32) -> Vec<u32> {
debug!(id = %msg_id,"received beacon");
let (_recv, rtx) = self.internal_on_received_message(msg_id as usize, None);
rtx
}
#[allow(dead_code)]
pub fn message_already_received(&self, msg_id: usize) -> bool {
if self.last_sent == usize::MAX {
return false;
}
if !Self::is_after(msg_id, self.last_sent) {
return true;
}
let next_expected = Self::add_with_wraparound(self.last_sent, 1);
let buffer_end =
Self::add_with_wraparound(self.last_sent, self.buffer.len() - self.first_entry);
if self.buffer.is_empty()
|| !Self::is_after(msg_id, self.last_sent)
|| Self::is_after(msg_id, buffer_end)
{
return false;
}
let pos = Self::id_distance(next_expected, msg_id) + self.first_entry;
self.buffer[pos].is_some()
}
fn internal_on_received_message(
&mut self,
msg_id: usize,
msg: Option<Message>,
) -> (Vec<Option<Message>>, Vec<u32>) {
debug!(id = %msg_id, "Received message");
let next_expected = if self.last_sent == usize::MAX {
usize::MAX } else {
Self::add_with_wraparound(self.last_sent, 1)
};
if self.last_sent == usize::MAX || (msg_id == next_expected) && (self.buffer.is_empty()) {
match msg {
Some(m) => {
debug!(id = %msg_id, "No loss detected, return message");
self.last_sent = msg_id;
return (vec![Some(m)], vec![]);
}
None => {
return (vec![], vec![msg_id as u32]);
}
}
}
if !Self::is_after(msg_id, self.last_sent) {
debug!("Received possibly DUP message or beacon for a received message, drop it");
return (vec![], vec![]);
}
if self.buffer.is_empty() {
self.first_entry = 0;
let mut rtx: Vec<u32> = Vec::new();
let next_expected = Self::add_with_wraparound(self.last_sent, 1);
let num_missing = Self::id_distance(next_expected, msg_id);
match msg {
Some(m) => {
self.buffer = vec![None; num_missing];
debug!(losses = %self.buffer.len(), "Losses found");
self.buffer.push(Some(m));
let mut current = next_expected;
for _ in 0..num_missing {
trace!(%current, "add to rtx vector");
rtx.push(current as u32);
current = Self::add_with_wraparound(current, 1);
}
}
None => {
self.buffer = vec![None; num_missing + 1];
debug!(losses = %self.buffer.len(), "Losses found");
let mut current = next_expected;
for _ in 0..=num_missing {
trace!(%current, "add to rtx vector");
rtx.push(current as u32);
current = Self::add_with_wraparound(current, 1);
}
}
}
(vec![], rtx)
} else {
debug!(
id = %msg_id,
"buffer is not empty and received OOO packet, process it",
);
trace!(
last_sent = %self.last_sent,
first_entry = %self.first_entry,
len = %self.buffer.len(),
"buffer status",
);
let next_expected = Self::add_with_wraparound(self.last_sent, 1);
let buffer_end =
Self::add_with_wraparound(self.last_sent, self.buffer.len() - self.first_entry);
let msg_in_range =
Self::is_after(msg_id, self.last_sent) && !Self::is_after(msg_id, buffer_end);
if msg_in_range {
debug!(
id = %msg_id,
%next_expected,
%buffer_end,
"message is inside the buffer range",
);
if msg.is_none() {
return (vec![], vec![]);
}
let pos = Self::id_distance(next_expected, msg_id) + self.first_entry;
debug!(%msg_id, %pos, "try to insert message");
if self.buffer[pos].is_some() {
info!("Received DUP message, drop it");
return (vec![], vec![]);
}
debug!(
%msg_id,
%pos,
"add message and try to release msgs",
);
self.buffer[pos] = msg;
(self.release_msgs(), vec![])
} else {
let mut rtx = Vec::new();
let buffer_next = Self::add_with_wraparound(buffer_end, 1);
let num_new_entries = Self::id_distance(buffer_next, msg_id);
let mut current = buffer_next;
for _ in 0..num_new_entries {
self.buffer.push(None);
rtx.push(current as u32);
debug!(
%current,
"detect packet loss to add at the end of the buffer",
);
current = Self::add_with_wraparound(current, 1);
}
match msg {
Some(m) => {
debug!(id = %msg_id,"add packet at the end of the buffer");
self.buffer.push(Some(m));
}
None => {
rtx.push(msg_id as u32)
}
}
(vec![], rtx)
}
}
}
fn release_msgs(&mut self) -> Vec<Option<Message>> {
let mut i = self.first_entry;
let mut ret = vec![];
while i < self.buffer.len() {
if self.buffer[i].is_some() {
ret.push(self.buffer[i].take());
self.last_sent = Self::add_with_wraparound(self.last_sent, 1);
self.first_entry += 1;
debug!(
pos = %i,
last_sent = %self.last_sent,
first_index = %self.first_entry,
"return message at pos, new buffer state",
);
} else {
let next_id = Self::add_with_wraparound(self.last_sent, 1);
if self.lost_msgs.contains(&next_id) {
ret.push(None);
self.lost_msgs.remove(&next_id);
self.last_sent = next_id;
self.first_entry += 1;
debug!(
lost_message = self.last_sent,
last_sent = %self.last_sent,
first_index = %self.first_entry,
"message lost, return none, new buffer state",
);
} else {
break;
}
}
i += 1;
}
if self.first_entry == self.buffer.len() {
debug!("clean reception buffer which is empty now");
self.first_entry = 0;
self.buffer = vec![];
}
let mut stop = false;
while !stop {
let next_id = Self::add_with_wraparound(self.last_sent, 1);
if self.lost_msgs.contains(&next_id) {
self.last_sent = next_id;
ret.push(None);
self.lost_msgs.remove(&self.last_sent);
debug!(
last_sent = %self.last_sent,
"found another lost message to release",
);
} else {
stop = true;
}
}
ret
}
}
#[cfg(test)]
mod tests {
use slim_datapath::api::ProtoName as Name;
use slim_datapath::api::{
ProtoSessionMessageType, ProtoSessionType, SessionHeader, SlimHeader,
};
use tracing_test::traced_test;
use super::*;
#[test]
#[traced_test]
fn test_receiver_buffer() {
let src = Name::from_strings(["org", "ns", "type"]).with_id(0);
let src_id = src.to_string();
let name_type = Name::from_strings(["org", "ns", "type"]).with_id(1);
let slim_header = SlimHeader::new(src, name_type, &src_id, None);
let h0 = SessionHeader::new(
ProtoSessionType::PointToPoint.into(),
ProtoSessionMessageType::Msg.into(),
0,
0,
);
let h1 = SessionHeader::new(
ProtoSessionType::PointToPoint.into(),
ProtoSessionMessageType::Msg.into(),
0,
1,
);
let h2 = SessionHeader::new(
ProtoSessionType::PointToPoint.into(),
ProtoSessionMessageType::Msg.into(),
0,
2,
);
let h3 = SessionHeader::new(
ProtoSessionType::PointToPoint.into(),
ProtoSessionMessageType::Msg.into(),
0,
3,
);
let h4 = SessionHeader::new(
ProtoSessionType::PointToPoint.into(),
ProtoSessionMessageType::Msg.into(),
0,
4,
);
let h5 = SessionHeader::new(
ProtoSessionType::PointToPoint.into(),
ProtoSessionMessageType::Msg.into(),
0,
5,
);
let p0 = Message::builder()
.with_slim_header(slim_header.clone())
.with_session_header(h0)
.application_payload("", vec![])
.build_publish()
.unwrap();
let p1 = Message::builder()
.with_slim_header(slim_header.clone())
.with_session_header(h1)
.application_payload("", vec![])
.build_publish()
.unwrap();
let p2 = Message::builder()
.with_slim_header(slim_header.clone())
.with_session_header(h2)
.application_payload("", vec![])
.build_publish()
.unwrap();
let p3 = Message::builder()
.with_slim_header(slim_header.clone())
.with_session_header(h3)
.application_payload("", vec![])
.build_publish()
.unwrap();
let p4 = Message::builder()
.with_slim_header(slim_header.clone())
.with_session_header(h4)
.application_payload("", vec![])
.build_publish()
.unwrap();
let p5 = Message::builder()
.with_slim_header(slim_header.clone())
.with_session_header(h5)
.application_payload("", vec![])
.build_publish()
.unwrap();
let mut buffer = ReceiverBuffer::default();
let (recv, rtx) = buffer.on_received_message(p0.clone());
assert_eq!(recv.len(), 1);
assert_eq!(rtx.len(), 0);
assert_eq!(recv[0], Some(p0.clone()));
let (recv, rtx) = buffer.on_received_message(p1.clone());
assert_eq!(recv.len(), 1);
assert_eq!(rtx.len(), 0);
assert_eq!(recv[0], Some(p1.clone()));
let (recv, rtx) = buffer.on_received_message(p2.clone());
assert_eq!(recv.len(), 1);
assert_eq!(rtx.len(), 0);
assert_eq!(recv[0], Some(p2.clone()));
let (recv, rtx) = buffer.on_received_message(p3.clone());
assert_eq!(recv.len(), 1);
assert_eq!(rtx.len(), 0);
assert_eq!(recv[0], Some(p3.clone()));
let (recv, rtx) = buffer.on_received_message(p4.clone());
assert_eq!(recv.len(), 1);
assert_eq!(rtx.len(), 0);
assert_eq!(recv[0], Some(p4.clone()));
let mut buffer = ReceiverBuffer::default();
let (recv, rtx) = buffer.on_received_message(p2.clone());
assert_eq!(recv.len(), 1);
assert_eq!(rtx.len(), 0);
assert_eq!(recv[0], Some(p2.clone()));
let (recv, rtx) = buffer.on_received_message(p3.clone());
assert_eq!(recv.len(), 1);
assert_eq!(rtx.len(), 0);
assert_eq!(recv[0], Some(p3.clone()));
let (recv, rtx) = buffer.on_received_message(p4.clone());
assert_eq!(recv.len(), 1);
assert_eq!(rtx.len(), 0);
assert_eq!(recv[0], Some(p4.clone()));
let mut buffer = ReceiverBuffer::default();
let (recv, rtx) = buffer.on_received_message(p4.clone());
assert_eq!(recv.len(), 1);
assert_eq!(rtx.len(), 0);
assert_eq!(recv[0], Some(p4.clone()));
let (recv, rtx) = buffer.on_received_message(p4.clone());
assert_eq!(recv.len(), 0);
assert_eq!(rtx.len(), 0);
let (recv, rtx) = buffer.on_received_message(p0.clone());
assert_eq!(recv.len(), 0);
assert_eq!(rtx.len(), 0);
let mut buffer = ReceiverBuffer::default();
let (recv, rtx) = buffer.on_received_message(p1.clone());
assert_eq!(recv.len(), 1);
assert_eq!(rtx.len(), 0);
assert_eq!(recv[0], Some(p1.clone()));
let (recv, rtx) = buffer.on_received_message(p4.clone());
assert_eq!(recv.len(), 0);
assert_eq!(rtx.len(), 2);
assert_eq!(rtx[0], 2);
assert_eq!(rtx[1], 3);
let (recv, rtx) = buffer.on_received_message(p4.clone());
assert_eq!(recv.len(), 0);
assert_eq!(rtx.len(), 0);
let (recv, rtx) = buffer.on_received_message(p2.clone());
assert_eq!(recv.len(), 1);
assert_eq!(rtx.len(), 0);
assert_eq!(recv[0], Some(p2.clone()));
let (recv, rtx) = buffer.on_received_message(p2.clone());
assert_eq!(recv.len(), 0);
assert_eq!(rtx.len(), 0);
let (recv, rtx) = buffer.on_received_message(p3.clone());
assert_eq!(recv.len(), 2);
assert_eq!(rtx.len(), 0);
assert_eq!(recv[0], Some(p3.clone()));
assert_eq!(recv[1], Some(p4.clone()));
let mut buffer = ReceiverBuffer::default();
let (recv, rtx) = buffer.on_received_message(p0.clone());
assert_eq!(recv.len(), 1);
assert_eq!(rtx.len(), 0);
assert_eq!(recv[0], Some(p0.clone()));
let (recv, rtx) = buffer.on_received_message(p2.clone());
assert_eq!(recv.len(), 0);
assert_eq!(rtx.len(), 1);
assert_eq!(rtx[0], 1);
let (recv, rtx) = buffer.on_received_message(p5.clone());
assert_eq!(recv.len(), 0);
assert_eq!(rtx.len(), 2);
assert_eq!(rtx[0], 3);
assert_eq!(rtx[1], 4);
let (recv, rtx) = buffer.on_received_message(p2.clone());
assert_eq!(recv.len(), 0);
assert_eq!(rtx.len(), 0);
let (recv, rtx) = buffer.on_received_message(p3.clone());
assert_eq!(recv.len(), 0);
assert_eq!(rtx.len(), 0);
let (recv, rtx) = buffer.on_received_message(p4.clone());
assert_eq!(recv.len(), 0);
assert_eq!(rtx.len(), 0);
let (recv, rtx) = buffer.on_received_message(p1.clone());
assert_eq!(recv.len(), 5);
assert_eq!(rtx.len(), 0);
assert_eq!(recv[0], Some(p1.clone()));
assert_eq!(recv[1], Some(p2.clone()));
assert_eq!(recv[2], Some(p3.clone()));
assert_eq!(recv[3], Some(p4.clone()));
assert_eq!(recv[4], Some(p5.clone()));
let mut buffer = ReceiverBuffer::default();
let (recv, rtx) = buffer.on_received_message(p0.clone());
assert_eq!(recv.len(), 1);
assert_eq!(rtx.len(), 0);
assert_eq!(recv[0], Some(p0.clone()));
let (recv, rtx) = buffer.on_received_message(p2.clone());
assert_eq!(recv.len(), 0);
assert_eq!(rtx.len(), 1);
assert_eq!(rtx[0], 1);
let (recv, rtx) = buffer.on_received_message(p4.clone());
assert_eq!(recv.len(), 0);
assert_eq!(rtx.len(), 1);
assert_eq!(rtx[0], 3);
let recv = buffer.on_lost_message(1);
assert_eq!(recv.len(), 2);
assert_eq!(recv[0], None);
assert_eq!(recv[1], Some(p2.clone()));
let recv = buffer.on_lost_message(5);
assert_eq!(recv.len(), 0);
let (recv, rtx) = buffer.on_received_message(p3.clone());
assert_eq!(recv.len(), 3);
assert_eq!(rtx.len(), 0);
assert_eq!(recv[0], Some(p3.clone()));
assert_eq!(recv[1], Some(p4.clone()));
assert_eq!(recv[2], None);
let mut buffer = ReceiverBuffer::default();
let (recv, rtx) = buffer.on_received_message(p0.clone());
assert_eq!(recv.len(), 1);
assert_eq!(rtx.len(), 0);
assert_eq!(recv[0], Some(p0.clone()));
let rtx = buffer.on_beacon_message(2);
assert_eq!(rtx.len(), 2);
assert_eq!(rtx[0], 1);
assert_eq!(rtx[1], 2);
let rtx = buffer.on_beacon_message(1);
assert_eq!(rtx.len(), 0);
let (recv, rtx) = buffer.on_received_message(p2.clone());
assert_eq!(recv.len(), 0);
assert_eq!(rtx.len(), 0);
let (recv, rtx) = buffer.on_received_message(p1.clone());
assert_eq!(recv.len(), 2);
assert_eq!(rtx.len(), 0);
assert_eq!(recv[0], Some(p1.clone()));
assert_eq!(recv[1], Some(p2.clone()));
let (recv, rtx) = buffer.on_received_message(p4.clone());
assert_eq!(recv.len(), 0);
assert_eq!(rtx.len(), 1);
assert_eq!(rtx[0], 3);
let rtx = buffer.on_beacon_message(3);
assert_eq!(rtx.len(), 0);
let (recv, rtx) = buffer.on_received_message(p3.clone());
assert_eq!(recv.len(), 2);
assert_eq!(rtx.len(), 0);
assert_eq!(recv[0], Some(p3.clone()));
assert_eq!(recv[1], Some(p4.clone()));
let (recv, rtx) = buffer.on_received_message(p5.clone());
assert_eq!(recv.len(), 1);
assert_eq!(rtx.len(), 0);
assert_eq!(recv[0], Some(p5.clone()));
let buffer = ReceiverBuffer::default();
assert!(!buffer.message_already_received(0));
assert!(!buffer.message_already_received(1));
let mut buffer = ReceiverBuffer::default();
let (_recv, _rtx) = buffer.on_received_message(p0.clone());
assert!(buffer.message_already_received(0));
assert!(!buffer.message_already_received(1));
assert!(!buffer.message_already_received(2));
let (_recv, _rtx) = buffer.on_received_message(p1.clone());
let (_recv, _rtx) = buffer.on_received_message(p2.clone());
assert!(buffer.message_already_received(0));
assert!(buffer.message_already_received(1));
assert!(buffer.message_already_received(2));
assert!(!buffer.message_already_received(3));
let mut buffer = ReceiverBuffer::default();
let (_recv, _rtx) = buffer.on_received_message(p0.clone());
let (_recv, _rtx) = buffer.on_received_message(p2.clone());
let (_recv, _rtx) = buffer.on_received_message(p4.clone());
assert!(buffer.message_already_received(0));
assert!(!buffer.message_already_received(1));
assert!(buffer.message_already_received(2)); assert!(!buffer.message_already_received(3));
assert!(buffer.message_already_received(4)); assert!(!buffer.message_already_received(5));
let _ = buffer.on_lost_message(1);
assert!(buffer.message_already_received(1));
assert!(!buffer.message_already_received(3));
}
#[test]
#[traced_test]
fn test_receiver_buffer_wraparound() {
let src = Name::from_strings(["org", "ns", "type"]).with_id(0);
let src_id = src.to_string();
let name_type = Name::from_strings(["org", "ns", "type"]).with_id(1);
let slim_header = SlimHeader::new(src, name_type, &src_id, None);
let id_max_minus_1 = MAX_PUBLISH_ID - 1;
let id_max = MAX_PUBLISH_ID;
let id_zero = 0;
let id_one = 1;
let id_two = 2;
let h_max_minus_1 = SessionHeader::new(
ProtoSessionType::PointToPoint.into(),
ProtoSessionMessageType::Msg.into(),
0,
id_max_minus_1,
);
let h_max = SessionHeader::new(
ProtoSessionType::PointToPoint.into(),
ProtoSessionMessageType::Msg.into(),
0,
id_max,
);
let h_zero = SessionHeader::new(
ProtoSessionType::PointToPoint.into(),
ProtoSessionMessageType::Msg.into(),
0,
id_zero,
);
let h_one = SessionHeader::new(
ProtoSessionType::PointToPoint.into(),
ProtoSessionMessageType::Msg.into(),
0,
id_one,
);
let h_two = SessionHeader::new(
ProtoSessionType::PointToPoint.into(),
ProtoSessionMessageType::Msg.into(),
0,
id_two,
);
let p_max_minus_1 = Message::builder()
.with_slim_header(slim_header.clone())
.with_session_header(h_max_minus_1)
.application_payload("", vec![])
.build_publish()
.unwrap();
let p_max = Message::builder()
.with_slim_header(slim_header.clone())
.with_session_header(h_max)
.application_payload("", vec![])
.build_publish()
.unwrap();
let p_zero = Message::builder()
.with_slim_header(slim_header.clone())
.with_session_header(h_zero)
.application_payload("", vec![])
.build_publish()
.unwrap();
let p_one = Message::builder()
.with_slim_header(slim_header.clone())
.with_session_header(h_one)
.application_payload("", vec![])
.build_publish()
.unwrap();
let p_two = Message::builder()
.with_slim_header(slim_header.clone())
.with_session_header(h_two)
.application_payload("", vec![])
.build_publish()
.unwrap();
let mut buffer = ReceiverBuffer::default();
let (recv, rtx) = buffer.on_received_message(p_max_minus_1.clone());
assert_eq!(recv.len(), 1);
assert_eq!(rtx.len(), 0);
assert_eq!(recv[0], Some(p_max_minus_1.clone()));
let (recv, rtx) = buffer.on_received_message(p_max.clone());
assert_eq!(recv.len(), 1);
assert_eq!(rtx.len(), 0);
assert_eq!(recv[0], Some(p_max.clone()));
let (recv, rtx) = buffer.on_received_message(p_zero.clone());
assert_eq!(recv.len(), 1);
assert_eq!(rtx.len(), 0);
assert_eq!(recv[0], Some(p_zero.clone()));
let (recv, rtx) = buffer.on_received_message(p_one.clone());
assert_eq!(recv.len(), 1);
assert_eq!(rtx.len(), 0);
assert_eq!(recv[0], Some(p_one.clone()));
let mut buffer = ReceiverBuffer::default();
let (recv, rtx) = buffer.on_received_message(p_max_minus_1.clone());
assert_eq!(recv.len(), 1);
assert_eq!(rtx.len(), 0);
let (recv, rtx) = buffer.on_received_message(p_one.clone());
assert_eq!(recv.len(), 0);
assert_eq!(rtx.len(), 2);
assert_eq!(rtx[0], id_max);
assert_eq!(rtx[1], id_zero);
let (recv, rtx) = buffer.on_received_message(p_max.clone());
assert_eq!(recv.len(), 1);
assert_eq!(rtx.len(), 0);
assert_eq!(recv[0], Some(p_max.clone()));
let (recv, rtx) = buffer.on_received_message(p_zero.clone());
assert_eq!(recv.len(), 2);
assert_eq!(rtx.len(), 0);
assert_eq!(recv[0], Some(p_zero.clone()));
assert_eq!(recv[1], Some(p_one.clone()));
let mut buffer = ReceiverBuffer::default();
let (recv, rtx) = buffer.on_received_message(p_max_minus_1.clone());
assert_eq!(recv.len(), 1);
assert_eq!(rtx.len(), 0);
let (recv, rtx) = buffer.on_received_message(p_two.clone());
assert_eq!(recv.len(), 0);
assert_eq!(rtx.len(), 3);
assert_eq!(rtx[0], id_max);
assert_eq!(rtx[1], id_zero);
assert_eq!(rtx[2], id_one);
let mut buffer = ReceiverBuffer::default();
let (_recv, _rtx) = buffer.on_received_message(p_max_minus_1.clone());
let (_recv, _rtx) = buffer.on_received_message(p_max.clone());
let (_recv, _rtx) = buffer.on_received_message(p_zero.clone());
assert!(buffer.message_already_received(id_max_minus_1 as usize));
assert!(buffer.message_already_received(id_max as usize));
assert!(buffer.message_already_received(id_zero as usize));
assert!(!buffer.message_already_received(id_one as usize));
}
}