ironfix_session/
sequence.rs1use ironfix_core::types::SeqNum;
12use std::sync::atomic::{AtomicU64, Ordering};
13
14#[derive(Debug)]
18pub struct SequenceManager {
19 next_sender_seq: AtomicU64,
21 next_target_seq: AtomicU64,
23}
24
25impl SequenceManager {
26 #[must_use]
28 pub fn new() -> Self {
29 Self {
30 next_sender_seq: AtomicU64::new(1),
31 next_target_seq: AtomicU64::new(1),
32 }
33 }
34
35 #[must_use]
41 pub fn with_initial(sender_seq: u64, target_seq: u64) -> Self {
42 Self {
43 next_sender_seq: AtomicU64::new(sender_seq),
44 next_target_seq: AtomicU64::new(target_seq),
45 }
46 }
47
48 #[inline]
50 #[must_use]
51 pub fn next_sender_seq(&self) -> SeqNum {
52 SeqNum::new(self.next_sender_seq.load(Ordering::SeqCst))
53 }
54
55 #[inline]
57 #[must_use]
58 pub fn next_target_seq(&self) -> SeqNum {
59 SeqNum::new(self.next_target_seq.load(Ordering::SeqCst))
60 }
61
62 #[inline]
67 pub fn allocate_sender_seq(&self) -> SeqNum {
68 SeqNum::new(self.next_sender_seq.fetch_add(1, Ordering::SeqCst))
69 }
70
71 #[inline]
75 pub fn increment_target_seq(&self) {
76 self.next_target_seq.fetch_add(1, Ordering::SeqCst);
77 }
78
79 #[inline]
84 pub fn set_sender_seq(&self, seq: u64) {
85 self.next_sender_seq.store(seq, Ordering::SeqCst);
86 }
87
88 #[inline]
93 pub fn set_target_seq(&self, seq: u64) {
94 self.next_target_seq.store(seq, Ordering::SeqCst);
95 }
96
97 #[inline]
99 pub fn reset(&self) {
100 self.next_sender_seq.store(1, Ordering::SeqCst);
101 self.next_target_seq.store(1, Ordering::SeqCst);
102 }
103
104 #[must_use]
114 pub fn validate_incoming(&self, received: u64) -> SequenceResult {
115 let expected = self.next_target_seq.load(Ordering::SeqCst);
116
117 if received == expected {
118 SequenceResult::Ok
119 } else if received < expected {
120 SequenceResult::TooLow { expected, received }
121 } else {
122 SequenceResult::Gap { expected, received }
123 }
124 }
125}
126
127impl Default for SequenceManager {
128 fn default() -> Self {
129 Self::new()
130 }
131}
132
133#[derive(Debug, Clone, Copy, PartialEq, Eq)]
135pub enum SequenceResult {
136 Ok,
138 TooLow {
140 expected: u64,
142 received: u64,
144 },
145 Gap {
147 expected: u64,
149 received: u64,
151 },
152}
153
154impl SequenceResult {
155 #[must_use]
157 pub const fn is_ok(&self) -> bool {
158 matches!(self, Self::Ok)
159 }
160
161 #[must_use]
163 pub const fn is_gap(&self) -> bool {
164 matches!(self, Self::Gap { .. })
165 }
166
167 #[must_use]
169 pub const fn is_too_low(&self) -> bool {
170 matches!(self, Self::TooLow { .. })
171 }
172}
173
174#[cfg(test)]
175mod tests {
176 use super::*;
177
178 #[test]
179 fn test_sequence_manager_new() {
180 let mgr = SequenceManager::new();
181 assert_eq!(mgr.next_sender_seq().value(), 1);
182 assert_eq!(mgr.next_target_seq().value(), 1);
183 }
184
185 #[test]
186 fn test_allocate_sender_seq() {
187 let mgr = SequenceManager::new();
188
189 let seq1 = mgr.allocate_sender_seq();
190 assert_eq!(seq1.value(), 1);
191 assert_eq!(mgr.next_sender_seq().value(), 2);
192
193 let seq2 = mgr.allocate_sender_seq();
194 assert_eq!(seq2.value(), 2);
195 assert_eq!(mgr.next_sender_seq().value(), 3);
196 }
197
198 #[test]
199 fn test_increment_target_seq() {
200 let mgr = SequenceManager::new();
201
202 mgr.increment_target_seq();
203 assert_eq!(mgr.next_target_seq().value(), 2);
204
205 mgr.increment_target_seq();
206 assert_eq!(mgr.next_target_seq().value(), 3);
207 }
208
209 #[test]
210 fn test_validate_incoming() {
211 let mgr = SequenceManager::new();
212
213 assert!(mgr.validate_incoming(1).is_ok());
214
215 mgr.set_target_seq(5);
216 assert!(mgr.validate_incoming(4).is_too_low());
217 assert!(mgr.validate_incoming(5).is_ok());
218 assert!(mgr.validate_incoming(10).is_gap());
219 }
220
221 #[test]
222 fn test_reset() {
223 let mgr = SequenceManager::with_initial(100, 200);
224 assert_eq!(mgr.next_sender_seq().value(), 100);
225 assert_eq!(mgr.next_target_seq().value(), 200);
226
227 mgr.reset();
228 assert_eq!(mgr.next_sender_seq().value(), 1);
229 assert_eq!(mgr.next_target_seq().value(), 1);
230 }
231}