Skip to main content

atomr_fix/
session.rs

1//! FIX session-state machine.
2//!
3//! [`FixSession`] is a self-contained, runtime-agnostic driver for the FIX
4//! session layer. It is deliberately *not* an `atomr` `Actor`: the FSM is a
5//! plain async struct whose transitions are exercised by feeding it inbound
6//! [`FixMessage`]s and observing the outbound messages it produces. This makes
7//! the protocol logic unit-testable without a socket. A thin [`run`](FixSession::run)
8//! helper wires the same FSM onto an `atomr-streams` TCP connection +
9//! SOH framing for real deployments.
10//!
11//! # What the FSM covers
12//!
13//! * **Logon handshake** — [`build_logon`](FixSession::build_logon) emits a
14//!   Logon with `HeartBtInt(108)` and an optional `ResetSeqNumFlag(141)`; an
15//!   inbound Logon transitions the session to [`SessionState::Active`].
16//! * **Heartbeats / test requests** — [`heartbeat`](FixSession::heartbeat)
17//!   emits a Heartbeat on the configured interval; an inbound `TestRequest(1)`
18//!   is answered with a Heartbeat echoing `TestReqID(112)`.
19//! * **Sequence management** — every outbound message is stamped with the next
20//!   `MsgSeqNum` from the [`FixSeqStore`]; every accepted inbound message
21//!   advances the expected-inbound counter.
22//! * **Gap detection + resend** — an inbound message whose `MsgSeqNum` is
23//!   *higher* than expected triggers a `ResendRequest(2)` for the gap range.
24//! * **Resend / gap-fill** — an inbound `ResendRequest(2)` is answered with a
25//!   `SequenceReset(4)` carrying `GapFillFlag(123)=Y` (administrative messages
26//!   are never resent; the gap is filled instead).
27//! * **SequenceReset** — an inbound `SequenceReset` advances the expected
28//!   inbound counter to `NewSeqNo(36)`.
29//! * **Orderly logout** — [`build_logout`](FixSession::build_logout) emits a
30//!   Logout; an inbound Logout is answered (if we did not initiate) and the
31//!   session returns to [`SessionState::Disconnected`].
32//!
33//! # Template-level simplifications
34//!
35//! This is a real FSM, not a toy, but a few production behaviours are left as
36//! documented extension points rather than implemented:
37//!
38//! * Resend of *application* messages replays via `SequenceReset`-GapFill only;
39//!   a real engine would keep a per-session outbound message log and resend the
40//!   original application messages with `PossDupFlag(43)=Y`. The hook for that
41//!   is `FixSession::set_outbound_log`-style storage, intentionally omitted to
42//!   keep the store contract narrow.
43//! * `PossResend(97)` / `OrigSendingTime(122)` handling on inbound duplicates,
44//!   message-level validation (required-field / value checks beyond the header),
45//!   and logon authentication are out of scope.
46
47use std::sync::Arc;
48use std::time::Duration;
49
50use bytes::Bytes;
51
52use crate::message::{tags, FixMessage, MsgType};
53use crate::seq_store::FixSeqStore;
54
55/// FIX protocol version, selected at runtime (not via cargo features) so the
56/// default build exercises every begin-string branch.
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58pub enum FixVersion {
59    Fix42,
60    Fix44,
61    Fix50Sp2,
62}
63
64impl FixVersion {
65    /// The `BeginString(8)` value for this version. FIX 5.0 SP2 uses the FIXT
66    /// transport begin-string `FIXT.1.1`.
67    pub fn begin_string(&self) -> &'static str {
68        match self {
69            FixVersion::Fix42 => "FIX.4.2",
70            FixVersion::Fix44 => "FIX.4.4",
71            FixVersion::Fix50Sp2 => "FIXT.1.1",
72        }
73    }
74
75    /// Whether this version uses the FIXT transport (FIX 5.0+), which requires
76    /// `DefaultApplVerID(1137)` on the Logon.
77    pub fn is_fixt(&self) -> bool {
78        matches!(self, FixVersion::Fix50Sp2)
79    }
80}
81
82/// Static configuration for a session.
83#[derive(Debug, Clone)]
84pub struct FixSessionConfig {
85    pub version: FixVersion,
86    pub sender_comp_id: String,
87    pub target_comp_id: String,
88    pub heartbeat: Duration,
89    /// Send `ResetSeqNumFlag(141)=Y` on logon (and reset our store).
90    pub reset_on_logon: bool,
91}
92
93impl FixSessionConfig {
94    pub fn new(
95        version: FixVersion,
96        sender_comp_id: impl Into<String>,
97        target_comp_id: impl Into<String>,
98    ) -> Self {
99        FixSessionConfig {
100            version,
101            sender_comp_id: sender_comp_id.into(),
102            target_comp_id: target_comp_id.into(),
103            heartbeat: Duration::from_secs(30),
104            reset_on_logon: false,
105        }
106    }
107}
108
109/// Lifecycle state of the session FSM.
110#[derive(Debug, Clone, Copy, PartialEq, Eq)]
111pub enum SessionState {
112    Disconnected,
113    LogonSent,
114    Active,
115    LogoutSent,
116}
117
118/// The driver. Holds the FSM state, config, and the sequence store.
119pub struct FixSession {
120    config: FixSessionConfig,
121    store: Arc<dyn FixSeqStore>,
122    state: SessionState,
123}
124
125impl FixSession {
126    /// Construct a session from config and a sequence store.
127    pub fn new(config: FixSessionConfig, store: Arc<dyn FixSeqStore>) -> Self {
128        FixSession { config, store, state: SessionState::Disconnected }
129    }
130
131    pub fn state(&self) -> SessionState {
132        self.state
133    }
134
135    pub fn config(&self) -> &FixSessionConfig {
136        &self.config
137    }
138
139    pub fn store(&self) -> &Arc<dyn FixSeqStore> {
140        &self.store
141    }
142
143    // --- outbound message construction --------------------------------------
144
145    /// Stamp the standard header (BeginString, SenderCompID, TargetCompID,
146    /// MsgSeqNum) onto a message and return it ready to encode. Advances the
147    /// outbound sequence counter.
148    async fn finalize_outbound(&self, mut msg: FixMessage) -> FixMessage {
149        let seq = self.store.next_out().await;
150        msg.set(tags::BEGIN_STRING, self.config.version.begin_string());
151        msg.set(tags::SENDER_COMP_ID, self.config.sender_comp_id.clone());
152        msg.set(tags::TARGET_COMP_ID, self.config.target_comp_id.clone());
153        msg.set(tags::MSG_SEQ_NUM, seq.to_string());
154        msg.set(tags::SENDING_TIME, sending_time());
155        msg
156    }
157
158    /// Build and "send" a Logon. Transitions to [`SessionState::LogonSent`].
159    /// When [`reset_on_logon`](FixSessionConfig::reset_on_logon) is set, the
160    /// store is reset first and `ResetSeqNumFlag(141)=Y` is included.
161    pub async fn build_logon(&mut self) -> FixMessage {
162        if self.config.reset_on_logon {
163            self.store.reset().await;
164        }
165        let mut msg = FixMessage::of_type(MsgType::Logon);
166        msg.set(tags::ENCRYPT_METHOD, "0");
167        msg.set(tags::HEART_BT_INT, self.config.heartbeat.as_secs().to_string());
168        if self.config.reset_on_logon {
169            msg.set(tags::RESET_SEQ_NUM_FLAG, "Y");
170        }
171        if self.config.version.is_fixt() {
172            // FIX 5.0 SP2 application version (9 == FIX50SP2).
173            msg.set(tags::DEFAULT_APPL_VER_ID, "9");
174        }
175        let out = self.finalize_outbound(msg).await;
176        self.state = SessionState::LogonSent;
177        out
178    }
179
180    /// Build and "send" a Logout with optional reason text. Transitions to
181    /// [`SessionState::LogoutSent`].
182    pub async fn build_logout(&mut self, reason: Option<&str>) -> FixMessage {
183        let mut msg = FixMessage::of_type(MsgType::Logout);
184        if let Some(r) = reason {
185            msg.set(tags::TEXT, r);
186        }
187        let out = self.finalize_outbound(msg).await;
188        self.state = SessionState::LogoutSent;
189        out
190    }
191
192    /// Build a Heartbeat, optionally echoing a `TestReqID`.
193    pub async fn build_heartbeat(&mut self, test_req_id: Option<&str>) -> FixMessage {
194        let mut msg = FixMessage::of_type(MsgType::Heartbeat);
195        if let Some(id) = test_req_id {
196            msg.set(tags::TEST_REQ_ID, id);
197        }
198        self.finalize_outbound(msg).await
199    }
200
201    /// Periodic heartbeat (alias for `build_heartbeat(None)`), to be driven by
202    /// a timer on the [`heartbeat`](FixSessionConfig::heartbeat) interval.
203    pub async fn heartbeat(&mut self) -> FixMessage {
204        self.build_heartbeat(None).await
205    }
206
207    /// Build a `ResendRequest(2)` for `[begin, end]`. `end == 0` means
208    /// "infinity" per the FIX spec (all messages from `begin` onward).
209    pub async fn build_resend_request(&mut self, begin: u64, end: u64) -> FixMessage {
210        let mut msg = FixMessage::of_type(MsgType::ResendRequest);
211        msg.set(tags::BEGIN_SEQ_NO, begin.to_string());
212        msg.set(tags::END_SEQ_NO, end.to_string());
213        self.finalize_outbound(msg).await
214    }
215
216    /// Build a `SequenceReset(4)` with `GapFillFlag(123)=Y` advancing the
217    /// counterparty's expected inbound sequence to `new_seq_no`. The message is
218    /// stamped with `MsgSeqNum = begin_seq` rather than consuming a fresh
219    /// outbound number (gap-fill must occupy the slot of the first skipped
220    /// message).
221    pub async fn build_gap_fill(&self, begin_seq: u64, new_seq_no: u64) -> FixMessage {
222        let mut msg = FixMessage::of_type(MsgType::SequenceReset);
223        msg.set(tags::GAP_FILL_FLAG, "Y");
224        msg.set(tags::POSS_DUP_FLAG, "Y");
225        msg.set(tags::NEW_SEQ_NO, new_seq_no.to_string());
226        msg.set(tags::BEGIN_STRING, self.config.version.begin_string());
227        msg.set(tags::SENDER_COMP_ID, self.config.sender_comp_id.clone());
228        msg.set(tags::TARGET_COMP_ID, self.config.target_comp_id.clone());
229        msg.set(tags::MSG_SEQ_NUM, begin_seq.to_string());
230        msg.set(tags::SENDING_TIME, sending_time());
231        msg
232    }
233
234    /// Build an outbound application message of `msg_type`, stamping the header
235    /// and consuming an outbound sequence number. The caller sets the
236    /// application fields before/after as needed; this is the entry point the
237    /// application uses to send (e.g.) a `NewOrderSingle`.
238    pub async fn build_app_message(&self, mut msg: FixMessage) -> FixMessage {
239        // ensure tag 35 already set by caller
240        let seq = self.store.next_out().await;
241        msg.set(tags::BEGIN_STRING, self.config.version.begin_string());
242        msg.set(tags::SENDER_COMP_ID, self.config.sender_comp_id.clone());
243        msg.set(tags::TARGET_COMP_ID, self.config.target_comp_id.clone());
244        msg.set(tags::MSG_SEQ_NUM, seq.to_string());
245        msg.set(tags::SENDING_TIME, sending_time());
246        msg
247    }
248
249    // --- inbound handling ---------------------------------------------------
250
251    /// Process one inbound message and return the FSM's reaction:
252    /// the administrative replies to send and, if it is an application
253    /// message that should be surfaced, that message.
254    ///
255    /// This is the single, fully-testable entry point for the inbound side of
256    /// the protocol.
257    pub async fn handle_inbound(&mut self, msg: FixMessage) -> InboundOutcome {
258        let mut outcome = InboundOutcome::default();
259        let msg_type = msg.msg_type();
260        let seq = msg.seq_num();
261
262        // --- gap detection (applies to every sequenced inbound message) -----
263        // A SequenceReset with PossDupFlag/GapFill is allowed to carry a lower
264        // number, and we treat ResetSeqNumFlag logons specially below, so guard
265        // those cases.
266        let is_seq_reset = matches!(msg_type, Some(MsgType::SequenceReset));
267        let is_reset_logon = matches!(msg_type, Some(MsgType::Logon))
268            && msg.get(tags::RESET_SEQ_NUM_FLAG).map(|v| v == "Y" || v == "y").unwrap_or(false);
269
270        if let Some(seq) = seq {
271            if !is_seq_reset && !is_reset_logon {
272                let expected = self.store.current_in().await;
273                if seq > expected {
274                    // Gap: request resend of [expected, 0]=to-infinity. Do NOT
275                    // advance the inbound counter; we are waiting to fill the
276                    // gap first.
277                    let rr = self.build_resend_request(expected, 0).await;
278                    outcome.outbound.push(rr);
279                    outcome.gap_detected = Some((expected, seq));
280                    return outcome;
281                } else if seq < expected {
282                    // Stale/duplicate without PossDup — ignore (a real engine
283                    // would check PossDupFlag). Do not advance.
284                    outcome.ignored_duplicate = true;
285                    return outcome;
286                }
287            }
288        }
289
290        match msg_type {
291            Some(MsgType::Logon) => {
292                if is_reset_logon {
293                    self.store.reset().await;
294                }
295                if let Some(seq) = seq {
296                    self.store.observed_in(seq).await;
297                }
298                // If we initiated logon we are now Active; if the peer
299                // initiated, reply with our own Logon then go Active.
300                if self.state == SessionState::LogonSent {
301                    self.state = SessionState::Active;
302                } else {
303                    let reply = self.build_logon().await;
304                    outcome.outbound.push(reply);
305                    self.state = SessionState::Active;
306                }
307            }
308            Some(MsgType::TestRequest) => {
309                if let Some(seq) = seq {
310                    self.store.observed_in(seq).await;
311                }
312                let id = msg.get(tags::TEST_REQ_ID).map(|s| s.to_string());
313                let hb = self.build_heartbeat(id.as_deref()).await;
314                outcome.outbound.push(hb);
315            }
316            Some(MsgType::Heartbeat) => {
317                if let Some(seq) = seq {
318                    self.store.observed_in(seq).await;
319                }
320            }
321            Some(MsgType::ResendRequest) => {
322                if let Some(seq) = seq {
323                    self.store.observed_in(seq).await;
324                }
325                let begin = msg.get_u64(tags::BEGIN_SEQ_NO).unwrap_or(1);
326                let end = msg.get_u64(tags::END_SEQ_NO).unwrap_or(0);
327                // Resend policy: gap-fill the whole requested range. The new
328                // sequence number is one past our current outbound counter so
329                // the peer's expected-inbound resyncs to where we actually are.
330                let next_out = self.store.peek_out().await;
331                let new_seq_no = if end == 0 { next_out } else { (end + 1).max(next_out) };
332                let gap_fill = self.build_gap_fill(begin, new_seq_no).await;
333                outcome.outbound.push(gap_fill);
334            }
335            Some(MsgType::SequenceReset) => {
336                // Gap-fill or reset: trust NewSeqNo(36) and advance the expected
337                // inbound counter to it. (We do not enforce that GapFill resets
338                // only move forward here; that validation is an extension point.)
339                if let Some(new_seq) = msg.get_u64(tags::NEW_SEQ_NO) {
340                    // current_in becomes new_seq, so observe new_seq - 1.
341                    if new_seq >= 1 {
342                        self.store.observed_in(new_seq - 1).await;
343                    }
344                }
345            }
346            Some(MsgType::Logout) => {
347                if let Some(seq) = seq {
348                    self.store.observed_in(seq).await;
349                }
350                if self.state == SessionState::LogoutSent {
351                    // Our logout was acked: fully disconnected.
352                    self.state = SessionState::Disconnected;
353                } else {
354                    // Peer-initiated: ack with our own Logout, then disconnect.
355                    let reply = self.build_logout(None).await;
356                    outcome.outbound.push(reply);
357                    self.state = SessionState::Disconnected;
358                }
359            }
360            Some(MsgType::NewOrderSingle) | Some(MsgType::ExecutionReport) | Some(MsgType::Other(_)) => {
361                if let Some(seq) = seq {
362                    self.store.observed_in(seq).await;
363                }
364                outcome.application = Some(msg);
365            }
366            None => {
367                // No MsgType — malformed at the session layer; surface as ignored.
368                outcome.ignored_duplicate = true;
369            }
370        }
371
372        outcome
373    }
374
375    /// Drive this FSM over a live `atomr-streams` TCP connection.
376    ///
377    /// Reads SOH-delimited frames from `inbound` (already framed `Bytes`),
378    /// parses them, runs [`handle_inbound`](Self::handle_inbound), and writes
379    /// every produced outbound message to `writer`. Application messages are
380    /// forwarded to `app_tx`. Outbound application messages submitted on
381    /// `app_rx` are stamped and written. The loop ends when the inbound stream
382    /// closes or the session reaches [`SessionState::Disconnected`] after a
383    /// logout.
384    ///
385    /// This helper exists for real deployments; the protocol logic itself is
386    /// fully testable through [`handle_inbound`](Self::handle_inbound) without
387    /// any socket.
388    pub async fn run(
389        mut self,
390        inbound: atomr_streams::Source<Result<Bytes, atomr_streams::FramingError>>,
391        writer: tokio::sync::mpsc::UnboundedSender<Bytes>,
392        app_tx: tokio::sync::mpsc::UnboundedSender<FixMessage>,
393        mut app_rx: tokio::sync::mpsc::UnboundedReceiver<FixMessage>,
394    ) {
395        // Bridge the framed inbound Source into a channel using the public
396        // streams Sink, so we can `select!` it against the timer and app side.
397        let (frame_tx, mut frame_rx) =
398            tokio::sync::mpsc::unbounded_channel::<Result<Bytes, atomr_streams::FramingError>>();
399        tokio::spawn(async move {
400            atomr_streams::Sink::to_sender(inbound, frame_tx).await;
401        });
402
403        // Kick off with a logon.
404        let logon = self.build_logon().await;
405        let _ = writer.send(logon.to_wire());
406
407        let mut hb = tokio::time::interval(self.config.heartbeat);
408        hb.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
409
410        loop {
411            tokio::select! {
412                frame = frame_rx.recv() => {
413                    let Some(frame) = frame else { break; };
414                    let Ok(bytes) = frame else { continue; };
415                    let Ok(msg) = FixMessage::parse(&bytes) else { continue; };
416                    let outcome = self.handle_inbound(msg).await;
417                    for out in &outcome.outbound {
418                        let _ = writer.send(out.to_wire());
419                    }
420                    if let Some(app) = outcome.application {
421                        let _ = app_tx.send(app);
422                    }
423                    if self.state == SessionState::Disconnected {
424                        break;
425                    }
426                }
427                app = app_rx.recv() => {
428                    let Some(app) = app else { continue; };
429                    if self.state == SessionState::Active {
430                        let out = self.build_app_message(app).await;
431                        let _ = writer.send(out.to_wire());
432                    }
433                }
434                _ = hb.tick() => {
435                    if self.state == SessionState::Active {
436                        let hb_msg = self.heartbeat().await;
437                        let _ = writer.send(hb_msg.to_wire());
438                    }
439                }
440            }
441        }
442    }
443}
444
445/// The result of [`FixSession::handle_inbound`].
446#[derive(Debug, Default)]
447pub struct InboundOutcome {
448    /// Administrative replies the session wants to send, in order.
449    pub outbound: Vec<FixMessage>,
450    /// An application message to surface to the application, if any.
451    pub application: Option<FixMessage>,
452    /// `Some((expected, received))` when a sequence gap was detected.
453    pub gap_detected: Option<(u64, u64)>,
454    /// True when a stale duplicate / malformed message was ignored.
455    pub ignored_duplicate: bool,
456}
457
458/// A minimal UTC `SendingTime(52)` stamp (`YYYYMMDD-HH:MM:SS`). We avoid a
459/// chrono dependency: the value's exact resolution is not load-bearing for the
460/// session FSM, only its presence and format.
461fn sending_time() -> String {
462    let now =
463        std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap_or_default().as_secs();
464    // Decompose seconds-since-epoch into a civil UTC date/time without chrono.
465    let days = now / 86_400;
466    let secs_of_day = now % 86_400;
467    let (h, m, s) = (secs_of_day / 3600, (secs_of_day % 3600) / 60, secs_of_day % 60);
468    let (y, mo, d) = civil_from_days(days as i64);
469    format!("{y:04}{mo:02}{d:02}-{h:02}:{m:02}:{s:02}")
470}
471
472/// Convert days-since-Unix-epoch to a civil (year, month, day) using Howard
473/// Hinnant's algorithm.
474fn civil_from_days(z: i64) -> (i64, u32, u32) {
475    let z = z + 719_468;
476    let era = if z >= 0 { z } else { z - 146_096 } / 146_097;
477    let doe = (z - era * 146_097) as u64;
478    let yoe = (doe - doe / 1460 + doe / 36_524 - doe / 146_096) / 365;
479    let y = yoe as i64 + era * 400;
480    let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
481    let mp = (5 * doy + 2) / 153;
482    let d = (doy - (153 * mp + 2) / 5 + 1) as u32;
483    let m = if mp < 10 { mp + 3 } else { mp - 9 } as u32;
484    (if m <= 2 { y + 1 } else { y }, m, d)
485}