easyfix_session/
session_state.rs1use std::{
2 collections::{BTreeMap, HashSet},
3 ops::RangeInclusive,
4 time::Instant,
5};
6
7use easyfix_messages::{
8 fields::{FixString, SeqNum},
9 messages::FixtMessage,
10};
11use tracing::{instrument, trace};
12
13use crate::messages_storage::MessagesStorage;
14
15#[derive(Debug)]
16struct Messages(BTreeMap<SeqNum, Box<FixtMessage>>);
17
18impl Messages {
19 fn new() -> Messages {
20 Messages(BTreeMap::new())
21 }
22
23 fn first_seq(&self) -> Option<SeqNum> {
24 self.0.keys().next().copied()
25 }
26
27 fn enqueue(&mut self, seq_num: SeqNum, msg: Box<FixtMessage>) {
28 self.0.insert(seq_num, msg);
29 }
30
31 fn retrieve(&mut self, seq_num: SeqNum) -> Option<Box<FixtMessage>> {
32 self.0.remove(&seq_num)
33 }
34
35 fn clear(&mut self) {
36 self.0.clear();
37 }
38}
39
40#[derive(Debug)]
41pub(crate) struct State<S> {
42 enabled: bool,
43 received_logon: bool,
44 logon_sent: bool,
45 logout_sent_time: Option<Instant>,
46 reset_sent: bool,
47 reset_received: bool,
48 initiate: bool,
49 resend_range: Option<RangeInclusive<SeqNum>>,
50 last_sent_time: Instant,
51 last_received_time: Instant,
52
53 disconnected: bool,
54
55 next_expected_msg_seq_num: SeqNum,
62
63 queue: Messages,
64 messages_storage: S,
65
66 grace_period_test_req_ids: HashSet<FixString>,
67}
68
69impl<S: MessagesStorage> State<S> {
70 pub(crate) fn new(messages_storage: S) -> State<S> {
71 State {
72 enabled: true,
73 received_logon: false,
74 logon_sent: false,
75 logout_sent_time: None,
76 reset_sent: false,
77 reset_received: false,
78 initiate: false,
79 resend_range: None,
80 last_sent_time: Instant::now(),
81 last_received_time: Instant::now(),
82 disconnected: true,
83 next_expected_msg_seq_num: 0,
84 queue: Messages::new(),
85 messages_storage,
86 grace_period_test_req_ids: HashSet::new(),
87 }
88 }
89
90 pub fn enabled(&self) -> bool {
93 self.enabled
94 }
95
96 pub fn logon_received(&self) -> bool {
97 self.received_logon
98 }
99
100 pub fn set_logon_received(&mut self, logon_received: bool) {
101 self.received_logon = logon_received;
102 }
103
104 pub fn logon_sent(&self) -> bool {
105 self.logon_sent
106 }
107
108 pub fn set_logon_sent(&mut self, logon_sent: bool) {
109 self.logon_sent = logon_sent;
110 }
111
112 pub fn logout_sent_time(&self) -> Option<Instant> {
113 self.logout_sent_time
114 }
115
116 pub fn set_logout_sent_time(&mut self, logout_sent: bool) {
117 if logout_sent {
118 self.logout_sent_time = Some(Instant::now());
119 } else {
120 self.logout_sent_time = None;
121 }
122 }
123
124 pub fn reset_received(&self) -> bool {
125 self.reset_received
126 }
127
128 pub fn set_reset_received(&mut self, reset_received: bool) {
129 self.reset_received = reset_received;
130 }
131
132 pub fn reset_sent(&self) -> bool {
133 self.reset_sent
134 }
135
136 pub fn set_reset_sent(&mut self, reset_sent: bool) {
137 self.reset_sent = reset_sent;
138 }
139
140 pub fn initiate(&self) -> bool {
141 self.initiate
142 }
143
144 pub fn set_resend_range(&mut self, resend_range: RangeInclusive<SeqNum>) {
145 self.resend_range = Some(resend_range);
146 }
147
148 pub fn reset_resend_range(&mut self) {
149 self.resend_range = None;
150 }
151
152 pub fn resend_range(&self) -> Option<RangeInclusive<SeqNum>> {
153 self.resend_range.clone()
154 }
155
156 pub fn set_last_sent_time(&mut self, last_sent_time: Instant) {
157 self.last_sent_time = last_sent_time;
158 }
159
160 pub fn set_last_received_time(&mut self, last_received_time: Instant) {
161 self.last_received_time = last_received_time;
162 }
163
164 pub fn should_send_logon(&self) -> bool {
165 self.initiate() && !self.logon_sent()
166 }
167
168 pub fn set_reset_range_from_last_expected_logon_next_seq_num(&mut self) {
176 self.set_resend_range(self.next_expected_msg_seq_num..=0);
178 self.next_expected_msg_seq_num = 0;
180 }
181
182 pub fn set_last_expected_logon_next_seq_num(&mut self, seq_num: SeqNum) {
183 self.next_expected_msg_seq_num = seq_num;
184 }
185
186 pub fn is_expected_logon_next_seq_num_sent(&self) -> bool {
187 self.next_expected_msg_seq_num != 0
188 }
189
190 #[instrument(skip_all)]
191 pub fn enqueue_msg(&mut self, msg: Box<FixtMessage>) {
192 trace!(msg_seq_num = msg.header.msg_seq_num, msg_type = ?msg.msg_type());
193 self.queue.enqueue(msg.header.msg_seq_num, msg);
194 }
195
196 pub fn lowest_queued_seq_num(&self) -> Option<SeqNum> {
197 self.queue.first_seq()
198 }
199
200 pub fn retrieve_msg(&mut self) -> Option<Box<FixtMessage>> {
201 self.queue.retrieve(self.next_target_msg_seq_num())
202 }
203
204 pub fn clear_queue(&mut self) {
205 self.queue.clear();
206 }
207
208 pub fn fetch_range(&mut self, range: RangeInclusive<SeqNum>) -> impl Iterator<Item = &[u8]> {
209 self.messages_storage.fetch_range(range)
210 }
211
212 pub fn store(&mut self, seq_num: SeqNum, data: &[u8]) {
213 self.messages_storage.store(seq_num, data);
214 }
215
216 pub fn next_sender_msg_seq_num(&self) -> SeqNum {
217 self.messages_storage.next_sender_msg_seq_num()
218 }
219
220 pub fn next_target_msg_seq_num(&self) -> SeqNum {
221 self.messages_storage.next_target_msg_seq_num()
222 }
223
224 pub fn set_next_sender_msg_seq_num(&mut self, seq_num: SeqNum) {
225 self.messages_storage.set_next_sender_msg_seq_num(seq_num)
226 }
227
228 pub fn set_next_target_msg_seq_num(&mut self, seq_num: SeqNum) {
229 self.messages_storage.set_next_target_msg_seq_num(seq_num)
230 }
231
232 pub fn incr_next_sender_msg_seq_num(&mut self) {
233 self.messages_storage.incr_next_sender_msg_seq_num();
234 }
235
236 pub fn incr_next_target_msg_seq_num(&mut self) {
237 self.messages_storage.incr_next_target_msg_seq_num();
238 }
239
240 pub fn reset(&mut self) {
241 self.messages_storage.reset();
242 }
243
244 pub fn disconnect(&mut self, reset: bool) {
245 self.set_disconnected(true);
246
247 self.set_logout_sent_time(false);
248 self.set_reset_received(false);
249 self.set_reset_sent(false);
250 self.set_last_expected_logon_next_seq_num(0);
251 if reset {
252 self.reset();
253 }
254
255 self.reset_resend_range();
256 self.clear_queue();
257 }
258
259 pub fn disconnected(&self) -> bool {
260 self.disconnected
261 }
262
263 pub fn set_disconnected(&mut self, disconnected: bool) {
264 self.disconnected = disconnected;
265 }
266
267 pub fn input_timeout_cnt(&self) -> usize {
268 self.grace_period_test_req_ids.len()
269 }
270
271 pub fn register_grace_period_test_req_id(&mut self, test_req_id: FixString) {
272 self.grace_period_test_req_ids.insert(test_req_id);
273 }
274
275 pub fn validate_grace_period_test_req_id(&mut self, test_req_id: &FixString) {
276 if self.grace_period_test_req_ids.contains(test_req_id) {
277 self.reset_grace_period();
278 }
279 }
280
281 pub fn reset_grace_period(&mut self) {
282 self.grace_period_test_req_ids.clear();
283 }
284}