naia_shared/messages/channels/receivers/
ordered_reliable_receiver.rs1use std::collections::VecDeque;
2
3use crate::{
4 messages::channels::receivers::reliable_message_receiver::{
5 ReceiverArranger, ReliableMessageReceiver,
6 },
7 types::MessageIndex,
8 MessageContainer,
9};
10
11pub type OrderedReliableReceiver = ReliableMessageReceiver<OrderedArranger>;
13
14impl OrderedReliableReceiver {
15 pub fn new() -> Self {
16 Self::with_arranger(OrderedArranger {
17 messages_received: 0,
18 buffer: VecDeque::new(),
19 })
20 }
21}
22
23enum MessageSlot {
24 NotReceived,
25 Received(MessageContainer),
26 PreviousFragment,
27}
28
29impl MessageSlot {
30 fn is_not_received(&self) -> bool {
31 match self {
32 MessageSlot::NotReceived => true,
33 _ => false,
34 }
35 }
36}
37
38pub struct OrderedArranger {
40 buffer: VecDeque<(MessageIndex, MessageSlot)>,
41 messages_received: MessageIndex,
42}
43
44impl ReceiverArranger for OrderedArranger {
45 fn process(
46 &mut self,
47 start_message_index: MessageIndex,
48 end_message_index: MessageIndex,
49 message: MessageContainer,
50 ) -> Vec<MessageContainer> {
51 let mut output = Vec::new();
52 let mut current_index = 0;
53
54 loop {
56 if current_index < self.buffer.len() {
57 let Some((old_message_index, old_message)) = self.buffer.get_mut(current_index)
58 else {
59 panic!(
60 "Buffer should be instantiated to slot {:?} !",
61 start_message_index
62 );
63 };
64 let old_message_index = *old_message_index;
65 if old_message_index == start_message_index {
66 if old_message.is_not_received() {
67 *old_message = MessageSlot::Received(message);
68
69 let mut current_message_index = start_message_index;
70 while current_message_index != end_message_index {
71 current_index = current_index.wrapping_add(1);
72 let Some((old_message_index, old_message)) =
73 self.buffer.get_mut(current_index)
74 else {
75 panic!(
76 "Buffer should be instantiated to slot {:?} !",
77 old_message_index
78 );
79 };
80 let old_message_index = *old_message_index;
81 current_message_index = old_message_index;
82 if old_message.is_not_received() {
83 *old_message = MessageSlot::PreviousFragment;
84 } else {
85 panic!(
86 "Buffer should not have received message in slot {:?} !",
87 old_message_index
88 );
89 }
90 }
91
92 break;
93 } else {
94 panic!(
95 "Buffer should not have received message in slot {:?} !",
96 old_message_index
97 );
98 }
99 }
100 } else {
101 let next_message_index = self.messages_received.wrapping_add(current_index as u16);
102
103 if next_message_index == start_message_index {
104 self.buffer
105 .push_back((next_message_index, MessageSlot::Received(message)));
106
107 let mut next_message_index = next_message_index;
108 while next_message_index != end_message_index {
109 next_message_index = next_message_index.wrapping_add(1);
110 self.buffer
111 .push_back((next_message_index, MessageSlot::PreviousFragment));
112 }
113
114 break;
115 } else {
116 self.buffer
117 .push_back((next_message_index, MessageSlot::NotReceived));
118 }
120 }
121
122 current_index += 1;
123 }
124
125 loop {
127 let Some((_, MessageSlot::Received(_))) = self.buffer.front() else {
128 return output;
130 };
131 let Some((_, MessageSlot::Received(message))) = self.buffer.pop_front() else {
132 panic!("shouldn't be possible due to above check");
133 };
134
135 output.push(message);
136 self.messages_received = self.messages_received.wrapping_add(1);
137
138 while let Some((_, MessageSlot::PreviousFragment)) = self.buffer.front() {
139 self.messages_received = self.messages_received.wrapping_add(1);
140 self.buffer.pop_front();
141 }
142 }
143 }
144}