easyfix_session/
session_state.rs

1use std::{
2    collections::{BTreeMap, HashSet},
3    ops::RangeInclusive,
4    time::Instant,
5};
6
7use easyfix_messages::{
8    fields::{FixString, SeqNum},
9    messages::FixtMessage,
10};
11use tracing::{instrument, trace};
12
13use crate::messages_storage::MessagesStorage;
14
15#[derive(Debug)]
16struct Messages(BTreeMap<SeqNum, Box<FixtMessage>>);
17
18impl Messages {
19    fn new() -> Messages {
20        Messages(BTreeMap::new())
21    }
22
23    fn first_seq(&self) -> Option<SeqNum> {
24        self.0.keys().next().copied()
25    }
26
27    fn enqueue(&mut self, seq_num: SeqNum, msg: Box<FixtMessage>) {
28        self.0.insert(seq_num, msg);
29    }
30
31    fn retrieve(&mut self, seq_num: SeqNum) -> Option<Box<FixtMessage>> {
32        self.0.remove(&seq_num)
33    }
34
35    fn clear(&mut self) {
36        self.0.clear();
37    }
38}
39
40#[derive(Debug)]
41pub(crate) struct State<S> {
42    enabled: bool,
43    received_logon: bool,
44    logon_sent: bool,
45    logout_sent_time: Option<Instant>,
46    reset_sent: bool,
47    reset_received: bool,
48    initiate: bool,
49    resend_range: Option<RangeInclusive<SeqNum>>,
50    last_sent_time: Instant,
51    last_received_time: Instant,
52
53    disconnected: bool,
54
55    /// If this is anything other than zero it's the value of
56    /// the 789/NextExpectedMsgSeqNum tag in the last Logon message sent.
57    /// It is used to determine if the recipient has enough information
58    /// (assuming they support 789) to avoid the need for a resend request i.e.
59    /// they should be resending any necessary missing messages already.
60    /// This value is used to populate the resendRange if necessary.
61    next_expected_msg_seq_num: SeqNum,
62
63    queue: Messages,
64    messages_storage: S,
65
66    grace_period_test_req_ids: HashSet<FixString>,
67}
68
69impl<S: MessagesStorage> State<S> {
70    pub(crate) fn new(messages_storage: S) -> State<S> {
71        State {
72            enabled: true,
73            received_logon: false,
74            logon_sent: false,
75            logout_sent_time: None,
76            reset_sent: false,
77            reset_received: false,
78            initiate: false,
79            resend_range: None,
80            last_sent_time: Instant::now(),
81            last_received_time: Instant::now(),
82            disconnected: true,
83            next_expected_msg_seq_num: 0,
84            queue: Messages::new(),
85            messages_storage,
86            grace_period_test_req_ids: HashSet::new(),
87        }
88    }
89
90    ////
91
92    pub fn enabled(&self) -> bool {
93        self.enabled
94    }
95
96    pub fn logon_received(&self) -> bool {
97        self.received_logon
98    }
99
100    pub fn set_logon_received(&mut self, logon_received: bool) {
101        self.received_logon = logon_received;
102    }
103
104    pub fn logon_sent(&self) -> bool {
105        self.logon_sent
106    }
107
108    pub fn set_logon_sent(&mut self, logon_sent: bool) {
109        self.logon_sent = logon_sent;
110    }
111
112    pub fn logout_sent_time(&self) -> Option<Instant> {
113        self.logout_sent_time
114    }
115
116    pub fn set_logout_sent_time(&mut self, logout_sent: bool) {
117        if logout_sent {
118            self.logout_sent_time = Some(Instant::now());
119        } else {
120            self.logout_sent_time = None;
121        }
122    }
123
124    pub fn reset_received(&self) -> bool {
125        self.reset_received
126    }
127
128    pub fn set_reset_received(&mut self, reset_received: bool) {
129        self.reset_received = reset_received;
130    }
131
132    pub fn reset_sent(&self) -> bool {
133        self.reset_sent
134    }
135
136    pub fn set_reset_sent(&mut self, reset_sent: bool) {
137        self.reset_sent = reset_sent;
138    }
139
140    pub fn initiate(&self) -> bool {
141        self.initiate
142    }
143
144    pub fn set_resend_range(&mut self, resend_range: RangeInclusive<SeqNum>) {
145        self.resend_range = Some(resend_range);
146    }
147
148    pub fn reset_resend_range(&mut self) {
149        self.resend_range = None;
150    }
151
152    pub fn resend_range(&self) -> Option<RangeInclusive<SeqNum>> {
153        self.resend_range.clone()
154    }
155
156    pub fn set_last_sent_time(&mut self, last_sent_time: Instant) {
157        self.last_sent_time = last_sent_time;
158    }
159
160    pub fn set_last_received_time(&mut self, last_received_time: Instant) {
161        self.last_received_time = last_received_time;
162    }
163
164    pub fn should_send_logon(&self) -> bool {
165        self.initiate() && !self.logon_sent()
166    }
167
168    /// No actual resend request has occurred but at logon we populated tag
169    /// 789 so that the other side knows we are missing messages without
170    /// an explicit resend request and should immediately reply with
171    /// the missing messages.
172    ///
173    /// This is expected to be called only in the scenario where target is too
174    /// high on logon and tag 789 is supported.
175    pub fn set_reset_range_from_last_expected_logon_next_seq_num(&mut self) {
176        // we have already requested all msgs from nextExpectedMsgSeqNum to infinity
177        self.set_resend_range(self.next_expected_msg_seq_num..=0);
178        // clean up the variable (not really needed)
179        self.next_expected_msg_seq_num = 0;
180    }
181
182    pub fn set_last_expected_logon_next_seq_num(&mut self, seq_num: SeqNum) {
183        self.next_expected_msg_seq_num = seq_num;
184    }
185
186    pub fn is_expected_logon_next_seq_num_sent(&self) -> bool {
187        self.next_expected_msg_seq_num != 0
188    }
189
190    #[instrument(skip_all)]
191    pub fn enqueue_msg(&mut self, msg: Box<FixtMessage>) {
192        trace!(msg_seq_num = msg.header.msg_seq_num, msg_type = ?msg.msg_type());
193        self.queue.enqueue(msg.header.msg_seq_num, msg);
194    }
195
196    pub fn lowest_queued_seq_num(&self) -> Option<SeqNum> {
197        self.queue.first_seq()
198    }
199
200    pub fn retrieve_msg(&mut self) -> Option<Box<FixtMessage>> {
201        self.queue.retrieve(self.next_target_msg_seq_num())
202    }
203
204    pub fn clear_queue(&mut self) {
205        self.queue.clear();
206    }
207
208    pub fn fetch_range(&mut self, range: RangeInclusive<SeqNum>) -> impl Iterator<Item = &[u8]> {
209        self.messages_storage.fetch_range(range)
210    }
211
212    pub fn store(&mut self, seq_num: SeqNum, data: &[u8]) {
213        self.messages_storage.store(seq_num, data);
214    }
215
216    pub fn next_sender_msg_seq_num(&self) -> SeqNum {
217        self.messages_storage.next_sender_msg_seq_num()
218    }
219
220    pub fn next_target_msg_seq_num(&self) -> SeqNum {
221        self.messages_storage.next_target_msg_seq_num()
222    }
223
224    pub fn set_next_sender_msg_seq_num(&mut self, seq_num: SeqNum) {
225        self.messages_storage.set_next_sender_msg_seq_num(seq_num)
226    }
227
228    pub fn set_next_target_msg_seq_num(&mut self, seq_num: SeqNum) {
229        self.messages_storage.set_next_target_msg_seq_num(seq_num)
230    }
231
232    pub fn incr_next_sender_msg_seq_num(&mut self) {
233        self.messages_storage.incr_next_sender_msg_seq_num();
234    }
235
236    pub fn incr_next_target_msg_seq_num(&mut self) {
237        self.messages_storage.incr_next_target_msg_seq_num();
238    }
239
240    pub fn reset(&mut self) {
241        self.messages_storage.reset();
242    }
243
244    pub fn disconnect(&mut self, reset: bool) {
245        self.set_disconnected(true);
246
247        self.set_logout_sent_time(false);
248        self.set_reset_received(false);
249        self.set_reset_sent(false);
250        self.set_last_expected_logon_next_seq_num(0);
251        if reset {
252            self.reset();
253        }
254
255        self.reset_resend_range();
256        self.clear_queue();
257    }
258
259    pub fn disconnected(&self) -> bool {
260        self.disconnected
261    }
262
263    pub fn set_disconnected(&mut self, disconnected: bool) {
264        self.disconnected = disconnected;
265    }
266
267    pub fn input_timeout_cnt(&self) -> usize {
268        self.grace_period_test_req_ids.len()
269    }
270
271    pub fn register_grace_period_test_req_id(&mut self, test_req_id: FixString) {
272        self.grace_period_test_req_ids.insert(test_req_id);
273    }
274
275    pub fn validate_grace_period_test_req_id(&mut self, test_req_id: &FixString) {
276        if self.grace_period_test_req_ids.contains(test_req_id) {
277            self.reset_grace_period();
278        }
279    }
280
281    pub fn reset_grace_period(&mut self) {
282        self.grace_period_test_req_ids.clear();
283    }
284}