tachyon/tachyon/
receiver.rs

1
2use std::collections::VecDeque;
3
4use super::{nack::Nack, sequence::*, sequence_buffer::SequenceBuffer};
5
6const RECEIVE_BUFFER_SIZE: u16 = 1024;
7pub const RECEIVE_WINDOW_SIZE_DEFAULT: u16 = 512;
8
9pub struct Receiver {
10    pub is_ordered: bool,
11    pub receive_window_size: u16,
12    pub last_sequence: u16,
13    pub current_sequence: u16,
14    pub buffered: SequenceBuffer<Vec<u8>>,
15    pub published: VecDeque<Vec<u8>>,
16    pub received: SequenceBuffer<bool>,
17    pub resend_list: Vec<u16>,
18    pub nack_list: Vec<Nack>,
19    pub nack_queue: VecDeque<Nack>,
20    pub skipped_sequences: u64,
21}
22
23impl Receiver {
24    pub fn create(is_ordered: bool, receive_window_size: u16) -> Self {
25        let buffered: SequenceBuffer<Vec<u8>> = SequenceBuffer {
26            values: vec![None; RECEIVE_BUFFER_SIZE as usize],
27            partition_by: RECEIVE_BUFFER_SIZE,
28        };
29
30        let received: SequenceBuffer<bool> = SequenceBuffer {
31            values: vec![None; RECEIVE_BUFFER_SIZE as usize],
32            partition_by: RECEIVE_BUFFER_SIZE,
33        };
34
35        let receiver = Receiver {
36            is_ordered,
37            receive_window_size,
38            last_sequence: 0,
39            current_sequence: 0,
40            buffered,
41            published: VecDeque::new(),
42            received,
43            resend_list: Vec::new(),
44            nack_list: Vec::new(),
45            skipped_sequences: 0,
46            nack_queue: VecDeque::new()
47        };
48
49        return receiver;
50    }
51
52    pub fn default(is_ordered: bool) -> Self {
53        return Receiver::create(is_ordered, RECEIVE_WINDOW_SIZE_DEFAULT);
54    }
55
56    pub fn calculate_current_in_window(current: u16, last: u16) -> u16 {
57        if current == last {
58            return current;
59        }
60
61        let mut start: i32 = (last as i32 - RECEIVE_WINDOW_SIZE_DEFAULT as i32) as i32;
62        if start < 0 {
63            start = std::u16::MAX as i32 + start;
64        }
65
66        if Sequence::is_greater_then(start as u16, current) {
67            return start as u16;
68        } else {
69            return current;
70        }
71    }
72    pub fn should_increment_current(current: u16, last: u16, receive_window_size: u16) -> bool {
73        if current == last {
74            return false;
75        }
76
77        let mut start: i32 = (last as i32 - receive_window_size as i32) as i32;
78        if start < 0 {
79            start = std::u16::MAX as i32 + start;
80        }
81
82        if Sequence::is_greater_then(start as u16, current) {
83            return true;
84        } else {
85            return false;
86        }
87    }
88
89    pub fn take_published(&mut self) -> Option<Vec<u8>> {
90        return self.published.pop_front();
91    }
92
93    fn is_buffered(&self, sequence: u16) -> bool {
94        return self.buffered.is_some(sequence);
95    }
96
97    pub fn is_received(&self, sequence: u16) -> bool {
98        return self.received.is_some(sequence);
99    }
100
101    fn set_received(&mut self, sequence: u16) {
102        self.received.insert(sequence, true);
103    }
104
105    fn set_buffered(&mut self, sequence: u16, data: &[u8], length: usize) {
106        let mut buffer: Vec<u8> = vec![0; length];
107        buffer[..].copy_from_slice(&data[0..length]);
108        self.buffered.insert(sequence, buffer);
109    }
110
111    // Note:  we use current sequence increments to mark previous as not received.
112    // This forces current to only ever increment by 1.  Ie we can't just adjust our window forward
113    // in big steps for example or we would leave a bunch of entries < current still marked as received.
114
115    pub fn receive_packet(&mut self, sequence: u16, data: &[u8], length: usize) -> bool {
116        // if the difference between current/last is greater then the window, increment current.
117        if Receiver::should_increment_current(self.current_sequence, self.last_sequence, self.receive_window_size) {
118            self.received.take(self.current_sequence);
119            self.current_sequence = Sequence::next_sequence(self.current_sequence);
120            self.skipped_sequences += 1;
121        }
122
123        if !Sequence::is_greater_then(sequence, self.current_sequence) {
124            return false;
125        }
126
127        if Sequence::is_greater_then(sequence, self.last_sequence) {
128            self.last_sequence = sequence;
129        }
130
131        let next = Sequence::next_sequence(self.current_sequence);
132        if sequence == next {
133            let last_sequence = self.current_sequence;
134            self.current_sequence = sequence;
135            self.received.remove(last_sequence);
136        }
137
138        // resends can be higher then current and already received.
139        if self.is_received(sequence) {
140            return false;
141        } else {
142            self.set_buffered(sequence, data, length);
143            self.set_received(sequence);
144        }
145
146        self.publish();
147
148        return true;
149    }
150
151    pub fn publish(&mut self) {
152        // walk from current to last and move buffered into published
153        // increment current sequence until we hit a missing sequence.
154        // on missing, ordered channel breaks out it's done.
155        // unordered channel keep moving buffered to published
156
157        let start = self.current_sequence;
158        let end = Sequence::next_sequence(self.last_sequence);
159        let mut step_sequence = true;
160        let mut seq = start;
161
162        for _ in 0..self.receive_window_size {
163            if self.is_received(seq) {
164                if self.current_sequence == seq {
165                    self.received.remove(seq);
166                } else if step_sequence && Sequence::is_greater_then(seq, self.current_sequence) {
167                    self.current_sequence = seq;
168                    self.received.remove(seq);
169                }
170
171                if self.is_buffered(seq) {
172                    match self.buffered.take(seq) {
173                        Some(buffer) => {
174                            self.published.push_back(buffer);
175                        }
176                        None => {}
177                    }
178                }
179            } else {
180                if self.is_ordered {
181                    break;
182                } else {
183                    step_sequence = false;
184                }
185            }
186            seq = Sequence::next_sequence(seq);
187            if seq == end {
188                break;
189            }
190        }
191    }
192
193    pub fn set_resend_list(&mut self) {
194        self.resend_list.clear();
195
196        if self.current_sequence == self.last_sequence {
197            return;
198        }
199
200        let start = Sequence::previous_sequence(self.last_sequence);
201        let end = self.current_sequence;
202
203        let mut seq = start;
204
205        for _ in 0..self.receive_window_size {
206            if !self.is_received(seq) {
207                self.resend_list.push(seq);
208            }
209
210            seq = Sequence::previous_sequence(seq);
211            if seq == end {
212                break;
213            }
214        }
215    }
216
217    pub fn create_nacks(&mut self) -> u32 {
218        self.nack_list.clear();
219        self.nack_queue.clear();
220
221        let mut nacked_count = 0;
222        let mut seq = Sequence::previous_sequence(self.last_sequence);
223        if Sequence::is_equal_to_or_less_than(seq, self.current_sequence) {
224            return nacked_count;
225        }
226     
227        let count = self.receive_window_size / 32;
228
229        for _ in 0..count {
230
231            if Sequence::is_equal_to_or_less_than(seq, self.current_sequence) {
232                return nacked_count;
233            }
234
235            if self.is_received(seq) {
236                seq = Sequence::previous_sequence(seq);
237                if Sequence::is_equal_to_or_less_than(seq, self.current_sequence) {
238                    return nacked_count;
239                }
240                continue;
241            }
242
243            let mut current = Nack::default();
244            current.start_sequence = seq;
245            nacked_count += 1;
246            current.nacked_count = nacked_count;
247
248            for i in 0..32 {
249                seq = Sequence::previous_sequence(seq);
250
251                if Sequence::is_equal_to_or_less_than(seq, self.current_sequence) {
252                    self.nack_list.push(current);
253                    self.nack_queue.push_back(current);
254                    return nacked_count;
255                }
256    
257                if !self.is_received(seq) {
258                    current.set_bits(i, true);
259                    nacked_count += 1;
260                    current.nacked_count = nacked_count;
261                }
262            }
263            self.nack_list.push(current);
264            self.nack_queue.push_back(current);
265
266            seq = Sequence::previous_sequence(seq);
267            
268        }
269        return nacked_count;
270    }
271
272}
273
274#[cfg(test)]
275mod tests {
276
277    use crate::tachyon::{receiver::*};
278
279    pub fn is_nacked(receiver: &Receiver, sequence: u16) -> bool {
280        for nack in &receiver.nack_list {
281            if nack.is_nacked(sequence) {
282                return true;
283            }
284        }
285        return false;
286    }
287
288    fn assert_nack(receiver: &mut Receiver, sequence: u16) {
289        if receiver.is_received(sequence) || sequence >= receiver.last_sequence || sequence <= receiver.current_sequence {
290            if is_nacked(receiver, sequence) {
291                panic!("{0} is nacked", sequence);
292            } else {
293                //println!("{0} not nacked", sequence);
294            }
295        } else {
296            if !is_nacked(receiver,sequence) {
297                panic!("{0} not nacked", sequence);
298            } else {
299                //println!("{0} nacked", sequence);
300            }
301        }
302    }
303
304    #[test]
305    fn test_all_nacked() {
306        let mut channel = Receiver::default(true);
307        channel.current_sequence = 0;
308        channel.last_sequence = 512;
309        
310
311        let nack_count = channel.create_nacks();
312        assert_eq!(16, channel.nack_list.len());
313        assert_eq!(511, nack_count);
314
315        for i in 0..512 {
316            assert_nack(&mut channel, i);
317        }
318    }
319
320    #[test]
321    fn test_some_nacked() {
322        let mut channel = Receiver::default(true);
323        channel.current_sequence = 0;
324        channel.last_sequence = 64;
325        
326        channel.set_received(63);
327        channel.set_received(63 - 32);
328        channel.set_received(63 - 33);
329        channel.set_received(1);
330        let nacked_count = channel.create_nacks();
331
332        assert_eq!(2, channel.nack_list.len());
333        assert_eq!(63 - 4, nacked_count);
334
335        for i in 0..66 {
336            assert_nack(&mut channel,i);
337            
338        }
339    }
340
341    #[test]
342    fn test_skipped() {
343        let mut channel = Receiver::default(true);
344        let data: Vec<u8> = vec![0; 1024];
345        channel.current_sequence = 0;
346        channel.last_sequence = 512 + 10;
347
348        // should skip and take received
349        channel.set_received(0);
350        assert!(!channel.receive_packet(1, &data[..], 32));
351        assert!(!channel.is_received(0));
352        assert_eq!(1, channel.current_sequence);
353
354        assert!(!channel.receive_packet(1, &data[..], 32));
355        assert_eq!(2, channel.current_sequence);
356    }
357
358    #[test]
359    fn test_reset_receive_window() {
360        assert_eq!(65530, Receiver::calculate_current_in_window(65530, 100));
361        assert_eq!(0, Receiver::calculate_current_in_window(0, 512));
362        assert_eq!(10, Receiver::calculate_current_in_window(0, 512 + 10));
363        assert_eq!(1, Receiver::calculate_current_in_window(0, 513));
364        assert_eq!(0, Receiver::calculate_current_in_window(65533, 512));
365    }
366
367    #[test]
368    fn wrapping_in_order() {
369        let mut channel = Receiver::default(true);
370        channel.current_sequence = 65533;
371        let data: Vec<u8> = vec![0; 1024];
372
373        let receive_result = channel.receive_packet(65534, &data[..], 32);
374        assert!(receive_result);
375
376        assert_eq!(65534, channel.current_sequence);
377        assert_eq!(1, channel.published.len());
378
379        let receive_result = channel.receive_packet(0, &data[..], 32);
380        assert!(receive_result);
381        assert_eq!(0, channel.current_sequence);
382        assert_eq!(0, channel.last_sequence);
383        assert!((channel.take_published().is_some()));
384
385        let receive_result = channel.receive_packet(1, &data[..], 32);
386        assert!(receive_result);
387        assert_eq!(1, channel.current_sequence);
388        assert!((channel.take_published().is_some()));
389
390        let receive_result = channel.receive_packet(2, &data[..], 32);
391        assert!(receive_result);
392        assert_eq!(2, channel.last_sequence);
393        assert_eq!(2, channel.current_sequence);
394        assert!((channel.take_published().is_some()));
395    }
396
397    #[test]
398    fn wrapping_out_of_order() {
399        let mut channel = Receiver::default(true);
400        channel.current_sequence = 65533;
401        let data: Vec<u8> = vec![0; 1024];
402        let receive_result = channel.receive_packet(65534, &data[..], 32);
403        assert!(receive_result);
404        assert_eq!(65534, channel.current_sequence);
405        let receive_result = channel.receive_packet(2, &data[..], 32);
406        assert!(receive_result);
407        assert_eq!(65534, channel.current_sequence);
408        assert_eq!(2, channel.last_sequence);
409        let receive_result = channel.receive_packet(1, &data[..], 32);
410        assert!(receive_result);
411        assert_eq!(65534, channel.current_sequence);
412
413        let receive_result = channel.receive_packet(0, &data[..], 32);
414        assert!(receive_result);
415        assert_eq!(2, channel.last_sequence);
416        assert_eq!(2, channel.current_sequence);
417    }
418
419  
420    #[test]
421    fn full_wrap() {
422        let mut channel = Receiver::default(true);
423        let data: Vec<u8> = vec![0; 1024];
424
425        let mut sequence = 1;
426        for _ in 1..200000 {
427            let _receive_result = channel.receive_packet(sequence, &data[..], 32);
428            if channel.current_sequence != sequence {
429                print!(
430                    "{0} {1} {2}\n",
431                    sequence, channel.current_sequence, channel.last_sequence
432                );
433                panic!();
434            }
435            assert!(channel.take_published().is_some());
436            // if channel.take_published().is_none() {
437            //     print!("{0} {1} {2}\n", sequence, channel.current_sequence, channel.last_sequence);
438            //     panic!();
439            // }
440            sequence = Sequence::next_sequence(sequence);
441        }
442    }
443
444    #[test]
445    fn publish_consume_publish() {
446        let mut channel = Receiver::default(true);
447        let data: Vec<u8> = vec![0; 1024];
448        let _receive_result = channel.receive_packet(1, &data[..], 32);
449        let _receive_result = channel.receive_packet(2, &data[..], 32);
450        assert!((channel.take_published().is_some()));
451        assert!((channel.take_published().is_some()));
452        assert!((channel.take_published().is_none()));
453
454        let _receive_result = channel.receive_packet(4, &data[..], 32);
455        let _receive_result = channel.receive_packet(3, &data[..], 32);
456        assert!((channel.take_published().is_some()));
457        assert!((channel.take_published().is_some()));
458        assert!((channel.take_published().is_none()));
459
460        let _receive_result = channel.receive_packet(5, &data[..], 32);
461        assert!((channel.take_published().is_some()));
462        assert!((channel.take_published().is_none()));
463
464        assert_eq!(0, channel.published.len());
465    }
466
467    #[test]
468    fn receive_older_fails() {
469        let mut channel = Receiver::default(true);
470        let data: Vec<u8> = vec![0; 1024];
471        let receive_result = channel.receive_packet(1, &data[..], 32);
472        assert!(receive_result);
473        let receive_result = channel.receive_packet(1, &data[..], 32);
474        assert!(!receive_result);
475        let receive_result = channel.receive_packet(0, &data[..], 32);
476        assert!(!receive_result);
477    }
478
479    #[test]
480    #[allow(dead_code)]
481    fn ordered_flow_test() {
482        let mut channel = Receiver::default(true);
483        let data: Vec<u8> = vec![0; 1024];
484        let receive_result = channel.receive_packet(1, &data[..], 32);
485        assert!(receive_result);
486        assert_eq!(1, channel.published.len());
487
488        let receive_result = channel.receive_packet(5, &data[..], 32);
489        assert!(receive_result);
490        assert_eq!(1, channel.published.len());
491        assert_eq!(1, channel.current_sequence);
492        assert_eq!(5, channel.last_sequence);
493       
494
495        let receive_result = channel.receive_packet(3, &data[..], 32);
496        assert!(receive_result);
497        assert_eq!(1, channel.current_sequence);
498
499        let _receive_result = channel.receive_packet(2, &data[..], 32);
500        assert_eq!(3, channel.current_sequence);
501        assert_eq!(3, channel.published.len());
502
503        let _receive_result = channel.receive_packet(4, &data[..], 32);
504        assert_eq!(5, channel.current_sequence);
505        assert_eq!(5, channel.last_sequence);
506       
507
508        assert_eq!(5, channel.published.len());
509
510        assert!(channel.take_published().is_some());
511        assert!(channel.take_published().is_some());
512        assert!(channel.take_published().is_some());
513        assert!(channel.take_published().is_some());
514        assert!(channel.take_published().is_some());
515
516        assert!(channel.take_published().is_none());
517        assert_eq!(0, channel.published.len());
518    }
519
520    #[test]
521    #[allow(dead_code)]
522    fn unordered_flow_test() {
523        let mut channel = Receiver::default(false);
524        let data: Vec<u8> = vec![0; 1024];
525        let _receive_result = channel.receive_packet(1, &data[..], 32);
526        assert_eq!(1, channel.published.len());
527        let _receive_result = channel.receive_packet(5, &data[..], 32);
528        assert_eq!(2, channel.published.len());
529        assert_eq!(1, channel.current_sequence);
530        assert_eq!(5, channel.last_sequence);
531       
532
533        let _receive_result = channel.receive_packet(3, &data[..], 32);
534        assert_eq!(1, channel.current_sequence);
535
536
537        let _receive_result = channel.receive_packet(2, &data[..], 32);
538        assert_eq!(3, channel.current_sequence);
539        assert_eq!(4, channel.published.len());
540
541
542        let _receive_result = channel.receive_packet(4, &data[..], 32);
543        assert_eq!(5, channel.current_sequence);
544        assert_eq!(5, channel.last_sequence);
545       
546        assert_eq!(5, channel.published.len());
547
548        assert!(channel.take_published().is_some());
549        assert!(channel.take_published().is_some());
550        assert!(channel.take_published().is_some());
551        assert!(channel.take_published().is_some());
552        assert!(channel.take_published().is_some());
553
554        assert!(channel.take_published().is_none());
555        assert_eq!(0, channel.published.len());
556    }
557}