laminar/infrastructure/arranging/
sequencing.rs

1//! Module with logic for arranging items in-sequence on multiple streams.
2//!
3//! "_Sequencing is the process of only caring about the newest items._"
4//!
5//! With sequencing, we only care about the newest items. When old items arrive we just toss them away.
6//!
7//! Example: sequence `1,3,2,5,4` will result into `1,3,5`.
8//!
9//! # Remarks
10//! - See [super-module](../index.html) description for more details.
11
12use std::{collections::HashMap, marker::PhantomData};
13
14use crate::packet::SequenceNumber;
15
16use super::{Arranging, ArrangingSystem};
17
18/// A sequencing system that can arrange items in sequence across different streams.
19///
20/// Checkout [`SequencingStream`](./struct.SequencingStream.html), or module description for more details.
21///
22/// # Remarks
23/// - See [super-module](../index.html) for more information about streams.
24pub struct SequencingSystem<T> {
25    // '[HashMap]' with streams on which items can be arranged in-sequence.
26    streams: HashMap<u8, SequencingStream<T>>,
27}
28
29impl<T> SequencingSystem<T> {
30    /// Constructs a new [`SequencingSystem`](./struct.SequencingSystem.html).
31    pub fn new() -> SequencingSystem<T> {
32        SequencingSystem {
33            streams: HashMap::with_capacity(32),
34        }
35    }
36}
37
38impl<T> ArrangingSystem for SequencingSystem<T> {
39    type Stream = SequencingStream<T>;
40
41    /// Returns the number of sequencing streams currently created.
42    fn stream_count(&self) -> usize {
43        self.streams.len()
44    }
45
46    /// Tries to get an [`SequencingStream`](./struct.SequencingStream.html) by `stream_id`.
47    /// When the stream does not exist, it will be inserted by the given `stream_id` and returned.
48    fn get_or_create_stream(&mut self, stream_id: u8) -> &mut Self::Stream {
49        self.streams
50            .entry(stream_id)
51            .or_insert_with(|| SequencingStream::new(stream_id))
52    }
53}
54
55/// A stream on which items will be arranged in-sequence.
56///
57/// # Algorithm
58///
59/// With every sequencing operation an `top_index` is given.
60///
61/// There are two scenarios that are important to us.
62/// 1. `incoming_index` >= `top_index`.
63/// This item is the newest or newer than the last one we have seen.
64/// Because of that we should return it back to the user.
65/// 2. `incoming_index` < `top_index`.
66/// This item is older than the newest item we have seen so far.
67/// Since we don't care about old items we can toss it a way.
68///
69/// # Remarks
70/// - See [super-module](../index.html) for more information about streams.
71pub struct SequencingStream<T> {
72    // the id of this stream.
73    _stream_id: u8,
74    // the highest seen item index.
75    top_index: u16,
76    // Needs `PhantomData`, otherwise, it can't use a generic in the `Arranging` implementation because `T` is not constrained.
77    phantom: PhantomData<T>,
78    // unique identifier which should be used for ordering on an other stream e.g. the remote endpoint.
79    unique_item_identifier: u16,
80}
81
82impl<T> SequencingStream<T> {
83    /// Constructs a new, empty '[SequencingStream](./struct.SequencingStream.html)'.
84    ///
85    /// The default stream will have a capacity of 32 items.
86    pub fn new(stream_id: u8) -> SequencingStream<T> {
87        SequencingStream {
88            _stream_id: stream_id,
89            top_index: 0,
90            phantom: PhantomData,
91            unique_item_identifier: 0,
92        }
93    }
94
95    /// Returns the identifier of this stream.
96    #[cfg(test)]
97    pub fn stream_id(&self) -> u8 {
98        self._stream_id
99    }
100
101    /// Returns the unique identifier which should be used for ordering on an other stream e.g. the remote endpoint.
102    pub fn new_item_identifier(&mut self) -> SequenceNumber {
103        let id = self.unique_item_identifier;
104        self.unique_item_identifier = self.unique_item_identifier.wrapping_add(1);
105        id
106    }
107}
108
109fn is_u16_within_half_window_from_start(start: u16, incoming: u16) -> bool {
110    // check (with wrapping) if the incoming value lies within the next u16::max_value()/2 from
111    // start.
112    incoming.wrapping_sub(start) <= u16::max_value() / 2 + 1
113}
114
115impl<T> Arranging for SequencingStream<T> {
116    type ArrangingItem = T;
117
118    /// Arranges the given item based on a sequencing algorithm.
119    ///
120    /// With every sequencing operation an `top_index` is given.
121    ///
122    /// # Algorithm
123    ///
124    /// There are two scenarios that are important to us.
125    /// 1. `incoming_index` >= `top_index`.
126    /// This item is the newest or newer than the last one we have seen.
127    /// Because of that we should return it back to the user.
128    /// 2. `incoming_index` < `top_index`.
129    /// This item is older than we the newest packet we have seen so far.
130    /// Since we don't care about old items we can toss it a way.
131    ///
132    /// # Remark
133    /// - All old packets will be tossed away.
134    /// - None is returned when an old packet is received.
135    fn arrange(
136        &mut self,
137        incoming_index: u16,
138        item: Self::ArrangingItem,
139    ) -> Option<Self::ArrangingItem> {
140        if is_u16_within_half_window_from_start(self.top_index, incoming_index) {
141            self.top_index = incoming_index;
142            return Some(item);
143        }
144        None
145    }
146}
147
148#[cfg(test)]
149mod tests {
150    use super::{Arranging, ArrangingSystem, SequencingSystem};
151
152    #[derive(Debug, PartialEq, Clone)]
153    struct Packet {
154        pub sequence: u16,
155        pub ordering_stream: u8,
156    }
157
158    impl Packet {
159        fn new(sequence: u16, ordering_stream: u8) -> Packet {
160            Packet {
161                sequence,
162                ordering_stream,
163            }
164        }
165    }
166
167    #[test]
168    fn create_stream() {
169        let mut system: SequencingSystem<Packet> = SequencingSystem::new();
170        let stream = system.get_or_create_stream(1);
171
172        assert_eq!(stream.stream_id(), 1);
173    }
174
175    #[test]
176    fn create_existing_stream() {
177        let mut system: SequencingSystem<Packet> = SequencingSystem::new();
178
179        system.get_or_create_stream(1);
180        let stream = system.get_or_create_stream(1);
181
182        assert_eq!(stream.stream_id(), 1);
183    }
184
185    /// asserts that the given collection, on the left, should result - after it is sequenced - into the given collection, on the right.
186    macro_rules! assert_sequence {
187        ( [$( $x:expr ),*], [$( $y:expr),*], $stream_id:expr) => {
188            {
189                // initialize vector of given range on the left.
190                let before = [$($x,)*];
191
192                // initialize vector of given range on the right.
193                let after = [$($y,)*];
194
195                // create system to handle sequenced packets.
196                let mut sequence_system = SequencingSystem::<Packet>::new();
197
198                // get stream '1' to process the sequenced packets on.
199                let stream = sequence_system.get_or_create_stream(1);
200
201                // get packets arranged in sequence.
202                let sequenced_packets: Vec<_> = std::array::IntoIter::new(before)
203                    .filter_map(|seq| stream.arrange(seq, Packet::new(seq, $stream_id)) // filter sequenced packets
204                        .map(|p| p.sequence))
205                    .collect();
206
207               // assert if the expected range of the given numbers equals to the processed range which is in sequence.
208               assert_eq!(after.to_vec(), sequenced_packets);
209            }
210        };
211    }
212
213    // This will assert a bunch of ranges to a correct sequenced range.
214    #[test]
215    fn can_sequence() {
216        assert_sequence!([1, 3, 5, 4, 2], [1, 3, 5], 1);
217        assert_sequence!([1, 5, 4, 3, 2], [1, 5], 1);
218        assert_sequence!([5, 3, 4, 2, 1], [5], 1);
219        assert_sequence!([4, 3, 2, 1, 5], [4, 5], 1);
220        assert_sequence!([2, 1, 4, 3, 5], [2, 4, 5], 1);
221        assert_sequence!([5, 2, 1, 4, 3], [5], 1);
222        assert_sequence!([3, 2, 4, 1, 5], [3, 4, 5], 1);
223    }
224
225    // This will assert a bunch of ranges to a correct sequenced range.
226    #[test]
227    fn sequence_on_multiple_streams() {
228        assert_sequence!([1, 3, 5, 4, 2], [1, 3, 5], 1);
229        assert_sequence!([1, 5, 4, 3, 2], [1, 5], 2);
230        assert_sequence!([5, 3, 4, 2, 1], [5], 3);
231        assert_sequence!([4, 3, 2, 1, 5], [4, 5], 4);
232        assert_sequence!([2, 1, 4, 3, 5], [2, 4, 5], 5);
233        assert_sequence!([5, 2, 1, 4, 3], [5], 6);
234        assert_sequence!([3, 2, 4, 1, 5], [3, 4, 5], 7);
235    }
236}