1
2use std::collections::VecDeque;
3
4use super::{nack::Nack, sequence::*, sequence_buffer::SequenceBuffer};
5
6const RECEIVE_BUFFER_SIZE: u16 = 1024;
7pub const RECEIVE_WINDOW_SIZE_DEFAULT: u16 = 512;
8
9pub struct Receiver {
10 pub is_ordered: bool,
11 pub receive_window_size: u16,
12 pub last_sequence: u16,
13 pub current_sequence: u16,
14 pub buffered: SequenceBuffer<Vec<u8>>,
15 pub published: VecDeque<Vec<u8>>,
16 pub received: SequenceBuffer<bool>,
17 pub resend_list: Vec<u16>,
18 pub nack_list: Vec<Nack>,
19 pub nack_queue: VecDeque<Nack>,
20 pub skipped_sequences: u64,
21}
22
23impl Receiver {
24 pub fn create(is_ordered: bool, receive_window_size: u16) -> Self {
25 let buffered: SequenceBuffer<Vec<u8>> = SequenceBuffer {
26 values: vec![None; RECEIVE_BUFFER_SIZE as usize],
27 partition_by: RECEIVE_BUFFER_SIZE,
28 };
29
30 let received: SequenceBuffer<bool> = SequenceBuffer {
31 values: vec![None; RECEIVE_BUFFER_SIZE as usize],
32 partition_by: RECEIVE_BUFFER_SIZE,
33 };
34
35 let receiver = Receiver {
36 is_ordered,
37 receive_window_size,
38 last_sequence: 0,
39 current_sequence: 0,
40 buffered,
41 published: VecDeque::new(),
42 received,
43 resend_list: Vec::new(),
44 nack_list: Vec::new(),
45 skipped_sequences: 0,
46 nack_queue: VecDeque::new()
47 };
48
49 return receiver;
50 }
51
52 pub fn default(is_ordered: bool) -> Self {
53 return Receiver::create(is_ordered, RECEIVE_WINDOW_SIZE_DEFAULT);
54 }
55
56 pub fn calculate_current_in_window(current: u16, last: u16) -> u16 {
57 if current == last {
58 return current;
59 }
60
61 let mut start: i32 = (last as i32 - RECEIVE_WINDOW_SIZE_DEFAULT as i32) as i32;
62 if start < 0 {
63 start = std::u16::MAX as i32 + start;
64 }
65
66 if Sequence::is_greater_then(start as u16, current) {
67 return start as u16;
68 } else {
69 return current;
70 }
71 }
72 pub fn should_increment_current(current: u16, last: u16, receive_window_size: u16) -> bool {
73 if current == last {
74 return false;
75 }
76
77 let mut start: i32 = (last as i32 - receive_window_size as i32) as i32;
78 if start < 0 {
79 start = std::u16::MAX as i32 + start;
80 }
81
82 if Sequence::is_greater_then(start as u16, current) {
83 return true;
84 } else {
85 return false;
86 }
87 }
88
89 pub fn take_published(&mut self) -> Option<Vec<u8>> {
90 return self.published.pop_front();
91 }
92
93 fn is_buffered(&self, sequence: u16) -> bool {
94 return self.buffered.is_some(sequence);
95 }
96
97 pub fn is_received(&self, sequence: u16) -> bool {
98 return self.received.is_some(sequence);
99 }
100
101 fn set_received(&mut self, sequence: u16) {
102 self.received.insert(sequence, true);
103 }
104
105 fn set_buffered(&mut self, sequence: u16, data: &[u8], length: usize) {
106 let mut buffer: Vec<u8> = vec![0; length];
107 buffer[..].copy_from_slice(&data[0..length]);
108 self.buffered.insert(sequence, buffer);
109 }
110
111 pub fn receive_packet(&mut self, sequence: u16, data: &[u8], length: usize) -> bool {
116 if Receiver::should_increment_current(self.current_sequence, self.last_sequence, self.receive_window_size) {
118 self.received.take(self.current_sequence);
119 self.current_sequence = Sequence::next_sequence(self.current_sequence);
120 self.skipped_sequences += 1;
121 }
122
123 if !Sequence::is_greater_then(sequence, self.current_sequence) {
124 return false;
125 }
126
127 if Sequence::is_greater_then(sequence, self.last_sequence) {
128 self.last_sequence = sequence;
129 }
130
131 let next = Sequence::next_sequence(self.current_sequence);
132 if sequence == next {
133 let last_sequence = self.current_sequence;
134 self.current_sequence = sequence;
135 self.received.remove(last_sequence);
136 }
137
138 if self.is_received(sequence) {
140 return false;
141 } else {
142 self.set_buffered(sequence, data, length);
143 self.set_received(sequence);
144 }
145
146 self.publish();
147
148 return true;
149 }
150
151 pub fn publish(&mut self) {
152 let start = self.current_sequence;
158 let end = Sequence::next_sequence(self.last_sequence);
159 let mut step_sequence = true;
160 let mut seq = start;
161
162 for _ in 0..self.receive_window_size {
163 if self.is_received(seq) {
164 if self.current_sequence == seq {
165 self.received.remove(seq);
166 } else if step_sequence && Sequence::is_greater_then(seq, self.current_sequence) {
167 self.current_sequence = seq;
168 self.received.remove(seq);
169 }
170
171 if self.is_buffered(seq) {
172 match self.buffered.take(seq) {
173 Some(buffer) => {
174 self.published.push_back(buffer);
175 }
176 None => {}
177 }
178 }
179 } else {
180 if self.is_ordered {
181 break;
182 } else {
183 step_sequence = false;
184 }
185 }
186 seq = Sequence::next_sequence(seq);
187 if seq == end {
188 break;
189 }
190 }
191 }
192
193 pub fn set_resend_list(&mut self) {
194 self.resend_list.clear();
195
196 if self.current_sequence == self.last_sequence {
197 return;
198 }
199
200 let start = Sequence::previous_sequence(self.last_sequence);
201 let end = self.current_sequence;
202
203 let mut seq = start;
204
205 for _ in 0..self.receive_window_size {
206 if !self.is_received(seq) {
207 self.resend_list.push(seq);
208 }
209
210 seq = Sequence::previous_sequence(seq);
211 if seq == end {
212 break;
213 }
214 }
215 }
216
217 pub fn create_nacks(&mut self) -> u32 {
218 self.nack_list.clear();
219 self.nack_queue.clear();
220
221 let mut nacked_count = 0;
222 let mut seq = Sequence::previous_sequence(self.last_sequence);
223 if Sequence::is_equal_to_or_less_than(seq, self.current_sequence) {
224 return nacked_count;
225 }
226
227 let count = self.receive_window_size / 32;
228
229 for _ in 0..count {
230
231 if Sequence::is_equal_to_or_less_than(seq, self.current_sequence) {
232 return nacked_count;
233 }
234
235 if self.is_received(seq) {
236 seq = Sequence::previous_sequence(seq);
237 if Sequence::is_equal_to_or_less_than(seq, self.current_sequence) {
238 return nacked_count;
239 }
240 continue;
241 }
242
243 let mut current = Nack::default();
244 current.start_sequence = seq;
245 nacked_count += 1;
246 current.nacked_count = nacked_count;
247
248 for i in 0..32 {
249 seq = Sequence::previous_sequence(seq);
250
251 if Sequence::is_equal_to_or_less_than(seq, self.current_sequence) {
252 self.nack_list.push(current);
253 self.nack_queue.push_back(current);
254 return nacked_count;
255 }
256
257 if !self.is_received(seq) {
258 current.set_bits(i, true);
259 nacked_count += 1;
260 current.nacked_count = nacked_count;
261 }
262 }
263 self.nack_list.push(current);
264 self.nack_queue.push_back(current);
265
266 seq = Sequence::previous_sequence(seq);
267
268 }
269 return nacked_count;
270 }
271
272}
273
274#[cfg(test)]
275mod tests {
276
277 use crate::tachyon::{receiver::*};
278
279 pub fn is_nacked(receiver: &Receiver, sequence: u16) -> bool {
280 for nack in &receiver.nack_list {
281 if nack.is_nacked(sequence) {
282 return true;
283 }
284 }
285 return false;
286 }
287
288 fn assert_nack(receiver: &mut Receiver, sequence: u16) {
289 if receiver.is_received(sequence) || sequence >= receiver.last_sequence || sequence <= receiver.current_sequence {
290 if is_nacked(receiver, sequence) {
291 panic!("{0} is nacked", sequence);
292 } else {
293 }
295 } else {
296 if !is_nacked(receiver,sequence) {
297 panic!("{0} not nacked", sequence);
298 } else {
299 }
301 }
302 }
303
304 #[test]
305 fn test_all_nacked() {
306 let mut channel = Receiver::default(true);
307 channel.current_sequence = 0;
308 channel.last_sequence = 512;
309
310
311 let nack_count = channel.create_nacks();
312 assert_eq!(16, channel.nack_list.len());
313 assert_eq!(511, nack_count);
314
315 for i in 0..512 {
316 assert_nack(&mut channel, i);
317 }
318 }
319
320 #[test]
321 fn test_some_nacked() {
322 let mut channel = Receiver::default(true);
323 channel.current_sequence = 0;
324 channel.last_sequence = 64;
325
326 channel.set_received(63);
327 channel.set_received(63 - 32);
328 channel.set_received(63 - 33);
329 channel.set_received(1);
330 let nacked_count = channel.create_nacks();
331
332 assert_eq!(2, channel.nack_list.len());
333 assert_eq!(63 - 4, nacked_count);
334
335 for i in 0..66 {
336 assert_nack(&mut channel,i);
337
338 }
339 }
340
341 #[test]
342 fn test_skipped() {
343 let mut channel = Receiver::default(true);
344 let data: Vec<u8> = vec![0; 1024];
345 channel.current_sequence = 0;
346 channel.last_sequence = 512 + 10;
347
348 channel.set_received(0);
350 assert!(!channel.receive_packet(1, &data[..], 32));
351 assert!(!channel.is_received(0));
352 assert_eq!(1, channel.current_sequence);
353
354 assert!(!channel.receive_packet(1, &data[..], 32));
355 assert_eq!(2, channel.current_sequence);
356 }
357
358 #[test]
359 fn test_reset_receive_window() {
360 assert_eq!(65530, Receiver::calculate_current_in_window(65530, 100));
361 assert_eq!(0, Receiver::calculate_current_in_window(0, 512));
362 assert_eq!(10, Receiver::calculate_current_in_window(0, 512 + 10));
363 assert_eq!(1, Receiver::calculate_current_in_window(0, 513));
364 assert_eq!(0, Receiver::calculate_current_in_window(65533, 512));
365 }
366
367 #[test]
368 fn wrapping_in_order() {
369 let mut channel = Receiver::default(true);
370 channel.current_sequence = 65533;
371 let data: Vec<u8> = vec![0; 1024];
372
373 let receive_result = channel.receive_packet(65534, &data[..], 32);
374 assert!(receive_result);
375
376 assert_eq!(65534, channel.current_sequence);
377 assert_eq!(1, channel.published.len());
378
379 let receive_result = channel.receive_packet(0, &data[..], 32);
380 assert!(receive_result);
381 assert_eq!(0, channel.current_sequence);
382 assert_eq!(0, channel.last_sequence);
383 assert!((channel.take_published().is_some()));
384
385 let receive_result = channel.receive_packet(1, &data[..], 32);
386 assert!(receive_result);
387 assert_eq!(1, channel.current_sequence);
388 assert!((channel.take_published().is_some()));
389
390 let receive_result = channel.receive_packet(2, &data[..], 32);
391 assert!(receive_result);
392 assert_eq!(2, channel.last_sequence);
393 assert_eq!(2, channel.current_sequence);
394 assert!((channel.take_published().is_some()));
395 }
396
397 #[test]
398 fn wrapping_out_of_order() {
399 let mut channel = Receiver::default(true);
400 channel.current_sequence = 65533;
401 let data: Vec<u8> = vec![0; 1024];
402 let receive_result = channel.receive_packet(65534, &data[..], 32);
403 assert!(receive_result);
404 assert_eq!(65534, channel.current_sequence);
405 let receive_result = channel.receive_packet(2, &data[..], 32);
406 assert!(receive_result);
407 assert_eq!(65534, channel.current_sequence);
408 assert_eq!(2, channel.last_sequence);
409 let receive_result = channel.receive_packet(1, &data[..], 32);
410 assert!(receive_result);
411 assert_eq!(65534, channel.current_sequence);
412
413 let receive_result = channel.receive_packet(0, &data[..], 32);
414 assert!(receive_result);
415 assert_eq!(2, channel.last_sequence);
416 assert_eq!(2, channel.current_sequence);
417 }
418
419
420 #[test]
421 fn full_wrap() {
422 let mut channel = Receiver::default(true);
423 let data: Vec<u8> = vec![0; 1024];
424
425 let mut sequence = 1;
426 for _ in 1..200000 {
427 let _receive_result = channel.receive_packet(sequence, &data[..], 32);
428 if channel.current_sequence != sequence {
429 print!(
430 "{0} {1} {2}\n",
431 sequence, channel.current_sequence, channel.last_sequence
432 );
433 panic!();
434 }
435 assert!(channel.take_published().is_some());
436 sequence = Sequence::next_sequence(sequence);
441 }
442 }
443
444 #[test]
445 fn publish_consume_publish() {
446 let mut channel = Receiver::default(true);
447 let data: Vec<u8> = vec![0; 1024];
448 let _receive_result = channel.receive_packet(1, &data[..], 32);
449 let _receive_result = channel.receive_packet(2, &data[..], 32);
450 assert!((channel.take_published().is_some()));
451 assert!((channel.take_published().is_some()));
452 assert!((channel.take_published().is_none()));
453
454 let _receive_result = channel.receive_packet(4, &data[..], 32);
455 let _receive_result = channel.receive_packet(3, &data[..], 32);
456 assert!((channel.take_published().is_some()));
457 assert!((channel.take_published().is_some()));
458 assert!((channel.take_published().is_none()));
459
460 let _receive_result = channel.receive_packet(5, &data[..], 32);
461 assert!((channel.take_published().is_some()));
462 assert!((channel.take_published().is_none()));
463
464 assert_eq!(0, channel.published.len());
465 }
466
467 #[test]
468 fn receive_older_fails() {
469 let mut channel = Receiver::default(true);
470 let data: Vec<u8> = vec![0; 1024];
471 let receive_result = channel.receive_packet(1, &data[..], 32);
472 assert!(receive_result);
473 let receive_result = channel.receive_packet(1, &data[..], 32);
474 assert!(!receive_result);
475 let receive_result = channel.receive_packet(0, &data[..], 32);
476 assert!(!receive_result);
477 }
478
479 #[test]
480 #[allow(dead_code)]
481 fn ordered_flow_test() {
482 let mut channel = Receiver::default(true);
483 let data: Vec<u8> = vec![0; 1024];
484 let receive_result = channel.receive_packet(1, &data[..], 32);
485 assert!(receive_result);
486 assert_eq!(1, channel.published.len());
487
488 let receive_result = channel.receive_packet(5, &data[..], 32);
489 assert!(receive_result);
490 assert_eq!(1, channel.published.len());
491 assert_eq!(1, channel.current_sequence);
492 assert_eq!(5, channel.last_sequence);
493
494
495 let receive_result = channel.receive_packet(3, &data[..], 32);
496 assert!(receive_result);
497 assert_eq!(1, channel.current_sequence);
498
499 let _receive_result = channel.receive_packet(2, &data[..], 32);
500 assert_eq!(3, channel.current_sequence);
501 assert_eq!(3, channel.published.len());
502
503 let _receive_result = channel.receive_packet(4, &data[..], 32);
504 assert_eq!(5, channel.current_sequence);
505 assert_eq!(5, channel.last_sequence);
506
507
508 assert_eq!(5, channel.published.len());
509
510 assert!(channel.take_published().is_some());
511 assert!(channel.take_published().is_some());
512 assert!(channel.take_published().is_some());
513 assert!(channel.take_published().is_some());
514 assert!(channel.take_published().is_some());
515
516 assert!(channel.take_published().is_none());
517 assert_eq!(0, channel.published.len());
518 }
519
520 #[test]
521 #[allow(dead_code)]
522 fn unordered_flow_test() {
523 let mut channel = Receiver::default(false);
524 let data: Vec<u8> = vec![0; 1024];
525 let _receive_result = channel.receive_packet(1, &data[..], 32);
526 assert_eq!(1, channel.published.len());
527 let _receive_result = channel.receive_packet(5, &data[..], 32);
528 assert_eq!(2, channel.published.len());
529 assert_eq!(1, channel.current_sequence);
530 assert_eq!(5, channel.last_sequence);
531
532
533 let _receive_result = channel.receive_packet(3, &data[..], 32);
534 assert_eq!(1, channel.current_sequence);
535
536
537 let _receive_result = channel.receive_packet(2, &data[..], 32);
538 assert_eq!(3, channel.current_sequence);
539 assert_eq!(4, channel.published.len());
540
541
542 let _receive_result = channel.receive_packet(4, &data[..], 32);
543 assert_eq!(5, channel.current_sequence);
544 assert_eq!(5, channel.last_sequence);
545
546 assert_eq!(5, channel.published.len());
547
548 assert!(channel.take_published().is_some());
549 assert!(channel.take_published().is_some());
550 assert!(channel.take_published().is_some());
551 assert!(channel.take_published().is_some());
552 assert!(channel.take_published().is_some());
553
554 assert!(channel.take_published().is_none());
555 assert_eq!(0, channel.published.len());
556 }
557}