1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
use std::collections::VecDeque;
use naia_serde::BitReader;
use crate::{types::MessageId, wrapping_number::sequence_less_than};
use super::{
message_channel::{ChannelReader, ChannelReceiver},
reliable_receiver::ReliableReceiver,
};
pub struct OrderedReliableReceiver<P> {
oldest_waiting_message_id: MessageId,
waiting_incoming_messages: VecDeque<(MessageId, Option<P>)>,
}
impl<P> OrderedReliableReceiver<P> {
pub fn new() -> Self {
Self {
oldest_waiting_message_id: 0,
waiting_incoming_messages: VecDeque::new(),
}
}
pub fn buffer_message(&mut self, message_id: MessageId, message: P) {
if sequence_less_than(message_id, self.oldest_waiting_message_id) {
return;
}
let mut index = 0;
let mut found = false;
loop {
if index < self.waiting_incoming_messages.len() {
if let Some((old_message_id, _)) = self.waiting_incoming_messages.get(index) {
if *old_message_id == message_id {
found = true;
}
}
if found {
let (_, old_message) = self.waiting_incoming_messages.get_mut(index).unwrap();
if old_message.is_none() {
*old_message = Some(message);
} else {
}
break;
}
} else {
let next_message_id = self.oldest_waiting_message_id.wrapping_add(index as u16);
if next_message_id == message_id {
self.waiting_incoming_messages
.push_back((next_message_id, Some(message)));
break;
} else {
self.waiting_incoming_messages
.push_back((next_message_id, None));
}
}
index += 1;
}
}
pub fn receive_messages(&mut self) -> Vec<P> {
let mut output = Vec::new();
loop {
let mut has_message = false;
if let Some((_, Some(_))) = self.waiting_incoming_messages.front() {
has_message = true;
}
if has_message {
let (_, message_opt) = self.waiting_incoming_messages.pop_front().unwrap();
let message = message_opt.unwrap();
output.push(message);
self.oldest_waiting_message_id = self.oldest_waiting_message_id.wrapping_add(1);
} else {
break;
}
}
return output;
}
}
impl<P: Send + Sync> ChannelReceiver<P> for OrderedReliableReceiver<P> {
fn read_messages(&mut self, channel_reader: &dyn ChannelReader<P>, bit_reader: &mut BitReader) {
let id_w_msgs = ReliableReceiver::read_incoming_messages(channel_reader, bit_reader);
for (id, message) in id_w_msgs {
self.buffer_message(id, message);
}
}
fn receive_messages(&mut self) -> Vec<P> {
return self.receive_messages();
}
}