naia_shared/messages/channels/receivers/
ordered_reliable_receiver.rs

1use std::collections::VecDeque;
2
3use crate::{
4    messages::channels::receivers::reliable_message_receiver::{
5        ReceiverArranger, ReliableMessageReceiver,
6    },
7    types::MessageIndex,
8    MessageContainer,
9};
10
11// OrderedReliableReceiver
12pub type OrderedReliableReceiver = ReliableMessageReceiver<OrderedArranger>;
13
14impl OrderedReliableReceiver {
15    pub fn new() -> Self {
16        Self::with_arranger(OrderedArranger {
17            messages_received: 0,
18            buffer: VecDeque::new(),
19        })
20    }
21}
22
23enum MessageSlot {
24    NotReceived,
25    Received(MessageContainer),
26    PreviousFragment,
27}
28
29impl MessageSlot {
30    fn is_not_received(&self) -> bool {
31        match self {
32            MessageSlot::NotReceived => true,
33            _ => false,
34        }
35    }
36}
37
38// OrderedArranger
39pub struct OrderedArranger {
40    buffer: VecDeque<(MessageIndex, MessageSlot)>,
41    messages_received: MessageIndex,
42}
43
44impl ReceiverArranger for OrderedArranger {
45    fn process(
46        &mut self,
47        start_message_index: MessageIndex,
48        end_message_index: MessageIndex,
49        message: MessageContainer,
50    ) -> Vec<MessageContainer> {
51        let mut output = Vec::new();
52        let mut current_index = 0;
53
54        // Put message where it needs to go in buffer
55        loop {
56            if current_index < self.buffer.len() {
57                let Some((old_message_index, old_message)) = self.buffer.get_mut(current_index)
58                else {
59                    panic!(
60                        "Buffer should be instantiated to slot {:?} !",
61                        start_message_index
62                    );
63                };
64                let old_message_index = *old_message_index;
65                if old_message_index == start_message_index {
66                    if old_message.is_not_received() {
67                        *old_message = MessageSlot::Received(message);
68
69                        let mut current_message_index = start_message_index;
70                        while current_message_index != end_message_index {
71                            current_index = current_index.wrapping_add(1);
72                            let Some((old_message_index, old_message)) =
73                                self.buffer.get_mut(current_index)
74                            else {
75                                panic!(
76                                    "Buffer should be instantiated to slot {:?} !",
77                                    old_message_index
78                                );
79                            };
80                            let old_message_index = *old_message_index;
81                            current_message_index = old_message_index;
82                            if old_message.is_not_received() {
83                                *old_message = MessageSlot::PreviousFragment;
84                            } else {
85                                panic!(
86                                    "Buffer should not have received message in slot {:?} !",
87                                    old_message_index
88                                );
89                            }
90                        }
91
92                        break;
93                    } else {
94                        panic!(
95                            "Buffer should not have received message in slot {:?} !",
96                            old_message_index
97                        );
98                    }
99                }
100            } else {
101                let next_message_index = self.messages_received.wrapping_add(current_index as u16);
102
103                if next_message_index == start_message_index {
104                    self.buffer
105                        .push_back((next_message_index, MessageSlot::Received(message)));
106
107                    let mut next_message_index = next_message_index;
108                    while next_message_index != end_message_index {
109                        next_message_index = next_message_index.wrapping_add(1);
110                        self.buffer
111                            .push_back((next_message_index, MessageSlot::PreviousFragment));
112                    }
113
114                    break;
115                } else {
116                    self.buffer
117                        .push_back((next_message_index, MessageSlot::NotReceived));
118                    // keep filling up buffer
119                }
120            }
121
122            current_index += 1;
123        }
124
125        // Pop messages out in order
126        loop {
127            let Some((_, MessageSlot::Received(_))) = self.buffer.front() else {
128                // no more messages, return
129                return output;
130            };
131            let Some((_, MessageSlot::Received(message))) = self.buffer.pop_front() else {
132                panic!("shouldn't be possible due to above check");
133            };
134
135            output.push(message);
136            self.messages_received = self.messages_received.wrapping_add(1);
137
138            while let Some((_, MessageSlot::PreviousFragment)) = self.buffer.front() {
139                self.messages_received = self.messages_received.wrapping_add(1);
140                self.buffer.pop_front();
141            }
142        }
143    }
144}