laminar/infrastructure/arranging/sequencing.rs
1//! Module with logic for arranging items in-sequence on multiple streams.
2//!
3//! "_Sequencing is the process of only caring about the newest items._"
4//!
5//! With sequencing, we only care about the newest items. When old items arrive we just toss them away.
6//!
7//! Example: sequence `1,3,2,5,4` will result into `1,3,5`.
8//!
9//! # Remarks
10//! - See [super-module](../index.html) description for more details.
11
12use std::{collections::HashMap, marker::PhantomData};
13
14use crate::packet::SequenceNumber;
15
16use super::{Arranging, ArrangingSystem};
17
18/// A sequencing system that can arrange items in sequence across different streams.
19///
20/// Checkout [`SequencingStream`](./struct.SequencingStream.html), or module description for more details.
21///
22/// # Remarks
23/// - See [super-module](../index.html) for more information about streams.
24pub struct SequencingSystem<T> {
25 // '[HashMap]' with streams on which items can be arranged in-sequence.
26 streams: HashMap<u8, SequencingStream<T>>,
27}
28
29impl<T> SequencingSystem<T> {
30 /// Constructs a new [`SequencingSystem`](./struct.SequencingSystem.html).
31 pub fn new() -> SequencingSystem<T> {
32 SequencingSystem {
33 streams: HashMap::with_capacity(32),
34 }
35 }
36}
37
38impl<T> ArrangingSystem for SequencingSystem<T> {
39 type Stream = SequencingStream<T>;
40
41 /// Returns the number of sequencing streams currently created.
42 fn stream_count(&self) -> usize {
43 self.streams.len()
44 }
45
46 /// Tries to get an [`SequencingStream`](./struct.SequencingStream.html) by `stream_id`.
47 /// When the stream does not exist, it will be inserted by the given `stream_id` and returned.
48 fn get_or_create_stream(&mut self, stream_id: u8) -> &mut Self::Stream {
49 self.streams
50 .entry(stream_id)
51 .or_insert_with(|| SequencingStream::new(stream_id))
52 }
53}
54
55/// A stream on which items will be arranged in-sequence.
56///
57/// # Algorithm
58///
59/// With every sequencing operation an `top_index` is given.
60///
61/// There are two scenarios that are important to us.
62/// 1. `incoming_index` >= `top_index`.
63/// This item is the newest or newer than the last one we have seen.
64/// Because of that we should return it back to the user.
65/// 2. `incoming_index` < `top_index`.
66/// This item is older than the newest item we have seen so far.
67/// Since we don't care about old items we can toss it a way.
68///
69/// # Remarks
70/// - See [super-module](../index.html) for more information about streams.
71pub struct SequencingStream<T> {
72 // the id of this stream.
73 _stream_id: u8,
74 // the highest seen item index.
75 top_index: u16,
76 // Needs `PhantomData`, otherwise, it can't use a generic in the `Arranging` implementation because `T` is not constrained.
77 phantom: PhantomData<T>,
78 // unique identifier which should be used for ordering on an other stream e.g. the remote endpoint.
79 unique_item_identifier: u16,
80}
81
82impl<T> SequencingStream<T> {
83 /// Constructs a new, empty '[SequencingStream](./struct.SequencingStream.html)'.
84 ///
85 /// The default stream will have a capacity of 32 items.
86 pub fn new(stream_id: u8) -> SequencingStream<T> {
87 SequencingStream {
88 _stream_id: stream_id,
89 top_index: 0,
90 phantom: PhantomData,
91 unique_item_identifier: 0,
92 }
93 }
94
95 /// Returns the identifier of this stream.
96 #[cfg(test)]
97 pub fn stream_id(&self) -> u8 {
98 self._stream_id
99 }
100
101 /// Returns the unique identifier which should be used for ordering on an other stream e.g. the remote endpoint.
102 pub fn new_item_identifier(&mut self) -> SequenceNumber {
103 let id = self.unique_item_identifier;
104 self.unique_item_identifier = self.unique_item_identifier.wrapping_add(1);
105 id
106 }
107}
108
109fn is_u16_within_half_window_from_start(start: u16, incoming: u16) -> bool {
110 // check (with wrapping) if the incoming value lies within the next u16::max_value()/2 from
111 // start.
112 incoming.wrapping_sub(start) <= u16::max_value() / 2 + 1
113}
114
115impl<T> Arranging for SequencingStream<T> {
116 type ArrangingItem = T;
117
118 /// Arranges the given item based on a sequencing algorithm.
119 ///
120 /// With every sequencing operation an `top_index` is given.
121 ///
122 /// # Algorithm
123 ///
124 /// There are two scenarios that are important to us.
125 /// 1. `incoming_index` >= `top_index`.
126 /// This item is the newest or newer than the last one we have seen.
127 /// Because of that we should return it back to the user.
128 /// 2. `incoming_index` < `top_index`.
129 /// This item is older than we the newest packet we have seen so far.
130 /// Since we don't care about old items we can toss it a way.
131 ///
132 /// # Remark
133 /// - All old packets will be tossed away.
134 /// - None is returned when an old packet is received.
135 fn arrange(
136 &mut self,
137 incoming_index: u16,
138 item: Self::ArrangingItem,
139 ) -> Option<Self::ArrangingItem> {
140 if is_u16_within_half_window_from_start(self.top_index, incoming_index) {
141 self.top_index = incoming_index;
142 return Some(item);
143 }
144 None
145 }
146}
147
148#[cfg(test)]
149mod tests {
150 use super::{Arranging, ArrangingSystem, SequencingSystem};
151
152 #[derive(Debug, PartialEq, Clone)]
153 struct Packet {
154 pub sequence: u16,
155 pub ordering_stream: u8,
156 }
157
158 impl Packet {
159 fn new(sequence: u16, ordering_stream: u8) -> Packet {
160 Packet {
161 sequence,
162 ordering_stream,
163 }
164 }
165 }
166
167 #[test]
168 fn create_stream() {
169 let mut system: SequencingSystem<Packet> = SequencingSystem::new();
170 let stream = system.get_or_create_stream(1);
171
172 assert_eq!(stream.stream_id(), 1);
173 }
174
175 #[test]
176 fn create_existing_stream() {
177 let mut system: SequencingSystem<Packet> = SequencingSystem::new();
178
179 system.get_or_create_stream(1);
180 let stream = system.get_or_create_stream(1);
181
182 assert_eq!(stream.stream_id(), 1);
183 }
184
185 /// asserts that the given collection, on the left, should result - after it is sequenced - into the given collection, on the right.
186 macro_rules! assert_sequence {
187 ( [$( $x:expr ),*], [$( $y:expr),*], $stream_id:expr) => {
188 {
189 // initialize vector of given range on the left.
190 let before = [$($x,)*];
191
192 // initialize vector of given range on the right.
193 let after = [$($y,)*];
194
195 // create system to handle sequenced packets.
196 let mut sequence_system = SequencingSystem::<Packet>::new();
197
198 // get stream '1' to process the sequenced packets on.
199 let stream = sequence_system.get_or_create_stream(1);
200
201 // get packets arranged in sequence.
202 let sequenced_packets: Vec<_> = std::array::IntoIter::new(before)
203 .filter_map(|seq| stream.arrange(seq, Packet::new(seq, $stream_id)) // filter sequenced packets
204 .map(|p| p.sequence))
205 .collect();
206
207 // assert if the expected range of the given numbers equals to the processed range which is in sequence.
208 assert_eq!(after.to_vec(), sequenced_packets);
209 }
210 };
211 }
212
213 // This will assert a bunch of ranges to a correct sequenced range.
214 #[test]
215 fn can_sequence() {
216 assert_sequence!([1, 3, 5, 4, 2], [1, 3, 5], 1);
217 assert_sequence!([1, 5, 4, 3, 2], [1, 5], 1);
218 assert_sequence!([5, 3, 4, 2, 1], [5], 1);
219 assert_sequence!([4, 3, 2, 1, 5], [4, 5], 1);
220 assert_sequence!([2, 1, 4, 3, 5], [2, 4, 5], 1);
221 assert_sequence!([5, 2, 1, 4, 3], [5], 1);
222 assert_sequence!([3, 2, 4, 1, 5], [3, 4, 5], 1);
223 }
224
225 // This will assert a bunch of ranges to a correct sequenced range.
226 #[test]
227 fn sequence_on_multiple_streams() {
228 assert_sequence!([1, 3, 5, 4, 2], [1, 3, 5], 1);
229 assert_sequence!([1, 5, 4, 3, 2], [1, 5], 2);
230 assert_sequence!([5, 3, 4, 2, 1], [5], 3);
231 assert_sequence!([4, 3, 2, 1, 5], [4, 5], 4);
232 assert_sequence!([2, 1, 4, 3, 5], [2, 4, 5], 5);
233 assert_sequence!([5, 2, 1, 4, 3], [5], 6);
234 assert_sequence!([3, 2, 4, 1, 5], [3, 4, 5], 7);
235 }
236}