1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
use std::collections::{HashMap, VecDeque};
use naia_shared::{
sequence_greater_than, BitReader, LocalEntityAndGlobalEntityConverter, MessageContainer,
MessageKinds, Serde, SerdeErr, ShortMessageIndex, Tick, TickBufferSettings,
UnsignedVariableInteger,
};
/// Receive updates from the client and store them in a buffer along with the corresponding
/// client tick.
pub struct TickBufferReceiverChannel {
incoming_messages: IncomingMessages,
}
impl TickBufferReceiverChannel {
pub fn new(_settings: TickBufferSettings) -> Self {
Self {
incoming_messages: IncomingMessages::new(),
}
}
/// Read the stored buffer-data corresponding to the given [`Tick`]
pub fn receive_messages(&mut self, host_tick: &Tick) -> Vec<MessageContainer> {
self.incoming_messages.collect(host_tick)
}
/// Given incoming packet data, read transmitted Messages and store
/// them in a buffer to be returned to the application
pub fn read_messages(
&mut self,
converter: &dyn LocalEntityAndGlobalEntityConverter,
message_kinds: &MessageKinds,
host_tick: &Tick,
remote_tick: &Tick,
reader: &mut BitReader,
) -> Result<(), SerdeErr> {
let mut last_read_tick = *remote_tick;
loop {
let message_continue = bool::de(reader)?;
if !message_continue {
break;
}
self.read_message(
message_kinds,
host_tick,
&mut last_read_tick,
converter,
reader,
)?;
}
Ok(())
}
/// Given incoming packet data, read transmitted Message and store
/// them to be returned to the application
fn read_message(
&mut self,
message_kinds: &MessageKinds,
host_tick: &Tick,
last_read_tick: &mut Tick,
entity_converter: &dyn LocalEntityAndGlobalEntityConverter,
reader: &mut BitReader,
) -> Result<(), SerdeErr> {
// read remote tick
let remote_tick_diff = UnsignedVariableInteger::<3>::de(reader)?.get() as Tick;
*last_read_tick = last_read_tick.wrapping_sub(remote_tick_diff);
let remote_tick = *last_read_tick;
// read message count
let message_count = UnsignedVariableInteger::<3>::de(reader)?.get();
let mut last_read_message_index: ShortMessageIndex = 0;
for _ in 0..message_count {
// read message id diff, add to last read id
let id_diff = UnsignedVariableInteger::<2>::de(reader)?.get() as ShortMessageIndex;
let message_index: ShortMessageIndex = last_read_message_index + id_diff;
last_read_message_index = message_index;
// read payload
let new_message = message_kinds.read(reader, entity_converter)?;
if !self
.incoming_messages
.insert(host_tick, &remote_tick, message_index, new_message)
{
// Failed to Insert Command
}
}
Ok(())
}
}
// Incoming messages
struct IncomingMessages {
// front is small, back is big
// front is present, back is future
/// Buffer containing messages from the client, along with the corresponding tick
/// We do not store anything for empty ticks
buffer: VecDeque<(Tick, HashMap<ShortMessageIndex, MessageContainer>)>,
}
impl IncomingMessages {
pub fn new() -> Self {
Self {
buffer: VecDeque::new(),
}
}
/// Insert a message from the client into the tick-buffer
/// Will only insert messages that are from future ticks compared to the current server tick
pub fn insert(
&mut self,
host_tick: &Tick,
message_tick: &Tick,
message_index: ShortMessageIndex, // this is used to de-dupe messages
new_message: MessageContainer,
) -> bool {
// TODO:
// * add unit test?
// * should there be a maximum buffer size?
if sequence_greater_than(*message_tick, *host_tick) {
let mut index = self.buffer.len();
//in the case of empty vec
if index == 0 {
let mut map = HashMap::new();
map.insert(message_index, new_message);
self.buffer.push_back((*message_tick, map));
return true;
}
let mut insert = false;
// loop from back to front (future to present)
loop {
index -= 1;
if let Some((existing_tick, existing_messages)) = self.buffer.get_mut(index) {
if *existing_tick == *message_tick {
// should almost never collide
if let std::collections::hash_map::Entry::Vacant(e) =
existing_messages.entry(message_index)
{
e.insert(new_message);
return true;
} else {
// TODO: log hash collisions?
return false;
}
} else if sequence_greater_than(*message_tick, *existing_tick) {
// incoming client tick is larger (more in the future) than found tick
insert = true;
}
}
if insert {
// found correct position to insert node
let mut new_messages = HashMap::new();
new_messages.insert(message_index, new_message);
self.buffer.insert(index + 1, (*message_tick, new_messages));
return true;
}
if index == 0 {
//traversed the whole vec, push front
let mut new_messages = HashMap::new();
new_messages.insert(message_index, new_message);
self.buffer.push_front((*message_tick, new_messages));
return true;
}
}
} else {
// command is too late to insert in incoming message queue
false
}
}
/// Delete from the buffer all data that is older than the provided [`Tick`]
fn prune_outdated_commands(&mut self, host_tick: &Tick) {
loop {
let mut pop = false;
if let Some((front_tick, _)) = self.buffer.front() {
if sequence_greater_than(*host_tick, *front_tick) {
pop = true;
}
}
if pop {
self.buffer.pop_front();
} else {
break;
}
}
}
/// Retrieve from the buffer data corresponding to the provided [`Tick`]
pub fn collect(&mut self, host_tick: &Tick) -> Vec<MessageContainer> {
self.prune_outdated_commands(host_tick);
// now get the newest applicable command
let mut output = Vec::new();
let mut pop = false;
if let Some((front_tick, _)) = self.buffer.front() {
if *front_tick == *host_tick {
pop = true;
}
}
if pop {
if let Some((_, mut command_map)) = self.buffer.pop_front() {
for (_, message) in command_map.drain() {
output.push(message);
}
}
}
output
}
}