1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
use std::collections::VecDeque;
use crate::{sequence_less_than, MessageIndex};
pub struct ReliableReceiver<M> {
oldest_received_message_index: MessageIndex,
record: VecDeque<(MessageIndex, bool)>,
incoming_messages: Vec<(MessageIndex, M)>,
}
impl<M> ReliableReceiver<M> {
pub fn new() -> Self {
Self {
oldest_received_message_index: 0,
record: VecDeque::default(),
incoming_messages: Vec::default(),
}
}
pub(crate) fn buffer_message(&mut self, message_index: MessageIndex, message: M) {
// moving from oldest incoming message to newest
// compare existing slots and see if the message_index has been instantiated
// already if it has, put the message into the slot
// otherwise, keep track of what the last message id was
// then add new empty slots at the end until getting to the incoming message id
// then, once you're there, put the new message in
if sequence_less_than(message_index, self.oldest_received_message_index) {
// already moved sliding window past this message id
return;
}
let mut current_index = 0;
loop {
let mut should_push_message = false;
if current_index < self.record.len() {
if let Some((old_message_index, old_message)) = self.record.get_mut(current_index) {
if *old_message_index == message_index {
if !(*old_message) {
*old_message = true;
should_push_message = true;
} else {
// already received this message
return;
}
}
}
} else {
let next_message_index = self
.oldest_received_message_index
.wrapping_add(current_index as u16);
if next_message_index == message_index {
self.record.push_back((next_message_index, true));
should_push_message = true;
} else {
self.record.push_back((next_message_index, false));
// keep filling up buffer
}
}
if should_push_message {
self.incoming_messages.push((message_index, message));
self.clear_old_messages();
return;
}
current_index += 1;
}
}
fn clear_old_messages(&mut self) {
// clear all received messages from record
loop {
let mut has_message = false;
if let Some((_, true)) = self.record.front() {
has_message = true;
}
if has_message {
self.record.pop_front();
self.oldest_received_message_index =
self.oldest_received_message_index.wrapping_add(1);
} else {
break;
}
}
}
pub(crate) fn receive_messages(&mut self) -> Vec<(MessageIndex, M)> {
std::mem::take(&mut self.incoming_messages)
}
}