atomr_fix/session.rs
1//! FIX session-state machine.
2//!
3//! [`FixSession`] is a self-contained, runtime-agnostic driver for the FIX
4//! session layer. It is deliberately *not* an `atomr` `Actor`: the FSM is a
5//! plain async struct whose transitions are exercised by feeding it inbound
6//! [`FixMessage`]s and observing the outbound messages it produces. This makes
7//! the protocol logic unit-testable without a socket. A thin [`run`](FixSession::run)
8//! helper wires the same FSM onto an `atomr-streams` TCP connection +
9//! SOH framing for real deployments.
10//!
11//! # What the FSM covers
12//!
13//! * **Logon handshake** — [`build_logon`](FixSession::build_logon) emits a
14//! Logon with `HeartBtInt(108)` and an optional `ResetSeqNumFlag(141)`; an
15//! inbound Logon transitions the session to [`SessionState::Active`].
16//! * **Heartbeats / test requests** — [`heartbeat`](FixSession::heartbeat)
17//! emits a Heartbeat on the configured interval; an inbound `TestRequest(1)`
18//! is answered with a Heartbeat echoing `TestReqID(112)`.
19//! * **Sequence management** — every outbound message is stamped with the next
20//! `MsgSeqNum` from the [`FixSeqStore`]; every accepted inbound message
21//! advances the expected-inbound counter.
22//! * **Gap detection + resend** — an inbound message whose `MsgSeqNum` is
23//! *higher* than expected triggers a `ResendRequest(2)` for the gap range.
24//! * **Resend / gap-fill** — an inbound `ResendRequest(2)` is answered with a
25//! `SequenceReset(4)` carrying `GapFillFlag(123)=Y` (administrative messages
26//! are never resent; the gap is filled instead).
27//! * **SequenceReset** — an inbound `SequenceReset` advances the expected
28//! inbound counter to `NewSeqNo(36)`.
29//! * **Orderly logout** — [`build_logout`](FixSession::build_logout) emits a
30//! Logout; an inbound Logout is answered (if we did not initiate) and the
31//! session returns to [`SessionState::Disconnected`].
32//!
33//! # Template-level simplifications
34//!
35//! This is a real FSM, not a toy, but a few production behaviours are left as
36//! documented extension points rather than implemented:
37//!
38//! * Resend of *application* messages replays via `SequenceReset`-GapFill only;
39//! a real engine would keep a per-session outbound message log and resend the
40//! original application messages with `PossDupFlag(43)=Y`. The hook for that
41//! is `FixSession::set_outbound_log`-style storage, intentionally omitted to
42//! keep the store contract narrow.
43//! * `PossResend(97)` / `OrigSendingTime(122)` handling on inbound duplicates,
44//! message-level validation (required-field / value checks beyond the header),
45//! and logon authentication are out of scope.
46
47use std::sync::Arc;
48use std::time::Duration;
49
50use bytes::Bytes;
51
52use crate::message::{tags, FixMessage, MsgType};
53use crate::seq_store::FixSeqStore;
54
55/// FIX protocol version, selected at runtime (not via cargo features) so the
56/// default build exercises every begin-string branch.
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58pub enum FixVersion {
59 Fix42,
60 Fix44,
61 Fix50Sp2,
62}
63
64impl FixVersion {
65 /// The `BeginString(8)` value for this version. FIX 5.0 SP2 uses the FIXT
66 /// transport begin-string `FIXT.1.1`.
67 pub fn begin_string(&self) -> &'static str {
68 match self {
69 FixVersion::Fix42 => "FIX.4.2",
70 FixVersion::Fix44 => "FIX.4.4",
71 FixVersion::Fix50Sp2 => "FIXT.1.1",
72 }
73 }
74
75 /// Whether this version uses the FIXT transport (FIX 5.0+), which requires
76 /// `DefaultApplVerID(1137)` on the Logon.
77 pub fn is_fixt(&self) -> bool {
78 matches!(self, FixVersion::Fix50Sp2)
79 }
80}
81
82/// Static configuration for a session.
83#[derive(Debug, Clone)]
84pub struct FixSessionConfig {
85 pub version: FixVersion,
86 pub sender_comp_id: String,
87 pub target_comp_id: String,
88 pub heartbeat: Duration,
89 /// Send `ResetSeqNumFlag(141)=Y` on logon (and reset our store).
90 pub reset_on_logon: bool,
91}
92
93impl FixSessionConfig {
94 pub fn new(
95 version: FixVersion,
96 sender_comp_id: impl Into<String>,
97 target_comp_id: impl Into<String>,
98 ) -> Self {
99 FixSessionConfig {
100 version,
101 sender_comp_id: sender_comp_id.into(),
102 target_comp_id: target_comp_id.into(),
103 heartbeat: Duration::from_secs(30),
104 reset_on_logon: false,
105 }
106 }
107}
108
109/// Lifecycle state of the session FSM.
110#[derive(Debug, Clone, Copy, PartialEq, Eq)]
111pub enum SessionState {
112 Disconnected,
113 LogonSent,
114 Active,
115 LogoutSent,
116}
117
118/// The driver. Holds the FSM state, config, and the sequence store.
119pub struct FixSession {
120 config: FixSessionConfig,
121 store: Arc<dyn FixSeqStore>,
122 state: SessionState,
123}
124
125impl FixSession {
126 /// Construct a session from config and a sequence store.
127 pub fn new(config: FixSessionConfig, store: Arc<dyn FixSeqStore>) -> Self {
128 FixSession { config, store, state: SessionState::Disconnected }
129 }
130
131 pub fn state(&self) -> SessionState {
132 self.state
133 }
134
135 pub fn config(&self) -> &FixSessionConfig {
136 &self.config
137 }
138
139 pub fn store(&self) -> &Arc<dyn FixSeqStore> {
140 &self.store
141 }
142
143 // --- outbound message construction --------------------------------------
144
145 /// Stamp the standard header (BeginString, SenderCompID, TargetCompID,
146 /// MsgSeqNum) onto a message and return it ready to encode. Advances the
147 /// outbound sequence counter.
148 async fn finalize_outbound(&self, mut msg: FixMessage) -> FixMessage {
149 let seq = self.store.next_out().await;
150 msg.set(tags::BEGIN_STRING, self.config.version.begin_string());
151 msg.set(tags::SENDER_COMP_ID, self.config.sender_comp_id.clone());
152 msg.set(tags::TARGET_COMP_ID, self.config.target_comp_id.clone());
153 msg.set(tags::MSG_SEQ_NUM, seq.to_string());
154 msg.set(tags::SENDING_TIME, sending_time());
155 msg
156 }
157
158 /// Build and "send" a Logon. Transitions to [`SessionState::LogonSent`].
159 /// When [`reset_on_logon`](FixSessionConfig::reset_on_logon) is set, the
160 /// store is reset first and `ResetSeqNumFlag(141)=Y` is included.
161 pub async fn build_logon(&mut self) -> FixMessage {
162 if self.config.reset_on_logon {
163 self.store.reset().await;
164 }
165 let mut msg = FixMessage::of_type(MsgType::Logon);
166 msg.set(tags::ENCRYPT_METHOD, "0");
167 msg.set(tags::HEART_BT_INT, self.config.heartbeat.as_secs().to_string());
168 if self.config.reset_on_logon {
169 msg.set(tags::RESET_SEQ_NUM_FLAG, "Y");
170 }
171 if self.config.version.is_fixt() {
172 // FIX 5.0 SP2 application version (9 == FIX50SP2).
173 msg.set(tags::DEFAULT_APPL_VER_ID, "9");
174 }
175 let out = self.finalize_outbound(msg).await;
176 self.state = SessionState::LogonSent;
177 out
178 }
179
180 /// Build and "send" a Logout with optional reason text. Transitions to
181 /// [`SessionState::LogoutSent`].
182 pub async fn build_logout(&mut self, reason: Option<&str>) -> FixMessage {
183 let mut msg = FixMessage::of_type(MsgType::Logout);
184 if let Some(r) = reason {
185 msg.set(tags::TEXT, r);
186 }
187 let out = self.finalize_outbound(msg).await;
188 self.state = SessionState::LogoutSent;
189 out
190 }
191
192 /// Build a Heartbeat, optionally echoing a `TestReqID`.
193 pub async fn build_heartbeat(&mut self, test_req_id: Option<&str>) -> FixMessage {
194 let mut msg = FixMessage::of_type(MsgType::Heartbeat);
195 if let Some(id) = test_req_id {
196 msg.set(tags::TEST_REQ_ID, id);
197 }
198 self.finalize_outbound(msg).await
199 }
200
201 /// Periodic heartbeat (alias for `build_heartbeat(None)`), to be driven by
202 /// a timer on the [`heartbeat`](FixSessionConfig::heartbeat) interval.
203 pub async fn heartbeat(&mut self) -> FixMessage {
204 self.build_heartbeat(None).await
205 }
206
207 /// Build a `ResendRequest(2)` for `[begin, end]`. `end == 0` means
208 /// "infinity" per the FIX spec (all messages from `begin` onward).
209 pub async fn build_resend_request(&mut self, begin: u64, end: u64) -> FixMessage {
210 let mut msg = FixMessage::of_type(MsgType::ResendRequest);
211 msg.set(tags::BEGIN_SEQ_NO, begin.to_string());
212 msg.set(tags::END_SEQ_NO, end.to_string());
213 self.finalize_outbound(msg).await
214 }
215
216 /// Build a `SequenceReset(4)` with `GapFillFlag(123)=Y` advancing the
217 /// counterparty's expected inbound sequence to `new_seq_no`. The message is
218 /// stamped with `MsgSeqNum = begin_seq` rather than consuming a fresh
219 /// outbound number (gap-fill must occupy the slot of the first skipped
220 /// message).
221 pub async fn build_gap_fill(&self, begin_seq: u64, new_seq_no: u64) -> FixMessage {
222 let mut msg = FixMessage::of_type(MsgType::SequenceReset);
223 msg.set(tags::GAP_FILL_FLAG, "Y");
224 msg.set(tags::POSS_DUP_FLAG, "Y");
225 msg.set(tags::NEW_SEQ_NO, new_seq_no.to_string());
226 msg.set(tags::BEGIN_STRING, self.config.version.begin_string());
227 msg.set(tags::SENDER_COMP_ID, self.config.sender_comp_id.clone());
228 msg.set(tags::TARGET_COMP_ID, self.config.target_comp_id.clone());
229 msg.set(tags::MSG_SEQ_NUM, begin_seq.to_string());
230 msg.set(tags::SENDING_TIME, sending_time());
231 msg
232 }
233
234 /// Build an outbound application message of `msg_type`, stamping the header
235 /// and consuming an outbound sequence number. The caller sets the
236 /// application fields before/after as needed; this is the entry point the
237 /// application uses to send (e.g.) a `NewOrderSingle`.
238 pub async fn build_app_message(&self, mut msg: FixMessage) -> FixMessage {
239 // ensure tag 35 already set by caller
240 let seq = self.store.next_out().await;
241 msg.set(tags::BEGIN_STRING, self.config.version.begin_string());
242 msg.set(tags::SENDER_COMP_ID, self.config.sender_comp_id.clone());
243 msg.set(tags::TARGET_COMP_ID, self.config.target_comp_id.clone());
244 msg.set(tags::MSG_SEQ_NUM, seq.to_string());
245 msg.set(tags::SENDING_TIME, sending_time());
246 msg
247 }
248
249 // --- inbound handling ---------------------------------------------------
250
251 /// Process one inbound message and return the FSM's reaction:
252 /// the administrative replies to send and, if it is an application
253 /// message that should be surfaced, that message.
254 ///
255 /// This is the single, fully-testable entry point for the inbound side of
256 /// the protocol.
257 pub async fn handle_inbound(&mut self, msg: FixMessage) -> InboundOutcome {
258 let mut outcome = InboundOutcome::default();
259 let msg_type = msg.msg_type();
260 let seq = msg.seq_num();
261
262 // --- gap detection (applies to every sequenced inbound message) -----
263 // A SequenceReset with PossDupFlag/GapFill is allowed to carry a lower
264 // number, and we treat ResetSeqNumFlag logons specially below, so guard
265 // those cases.
266 let is_seq_reset = matches!(msg_type, Some(MsgType::SequenceReset));
267 let is_reset_logon = matches!(msg_type, Some(MsgType::Logon))
268 && msg.get(tags::RESET_SEQ_NUM_FLAG).map(|v| v == "Y" || v == "y").unwrap_or(false);
269
270 if let Some(seq) = seq {
271 if !is_seq_reset && !is_reset_logon {
272 let expected = self.store.current_in().await;
273 if seq > expected {
274 // Gap: request resend of [expected, 0]=to-infinity. Do NOT
275 // advance the inbound counter; we are waiting to fill the
276 // gap first.
277 let rr = self.build_resend_request(expected, 0).await;
278 outcome.outbound.push(rr);
279 outcome.gap_detected = Some((expected, seq));
280 return outcome;
281 } else if seq < expected {
282 // Stale/duplicate without PossDup — ignore (a real engine
283 // would check PossDupFlag). Do not advance.
284 outcome.ignored_duplicate = true;
285 return outcome;
286 }
287 }
288 }
289
290 match msg_type {
291 Some(MsgType::Logon) => {
292 if is_reset_logon {
293 self.store.reset().await;
294 }
295 if let Some(seq) = seq {
296 self.store.observed_in(seq).await;
297 }
298 // If we initiated logon we are now Active; if the peer
299 // initiated, reply with our own Logon then go Active.
300 if self.state == SessionState::LogonSent {
301 self.state = SessionState::Active;
302 } else {
303 let reply = self.build_logon().await;
304 outcome.outbound.push(reply);
305 self.state = SessionState::Active;
306 }
307 }
308 Some(MsgType::TestRequest) => {
309 if let Some(seq) = seq {
310 self.store.observed_in(seq).await;
311 }
312 let id = msg.get(tags::TEST_REQ_ID).map(|s| s.to_string());
313 let hb = self.build_heartbeat(id.as_deref()).await;
314 outcome.outbound.push(hb);
315 }
316 Some(MsgType::Heartbeat) => {
317 if let Some(seq) = seq {
318 self.store.observed_in(seq).await;
319 }
320 }
321 Some(MsgType::ResendRequest) => {
322 if let Some(seq) = seq {
323 self.store.observed_in(seq).await;
324 }
325 let begin = msg.get_u64(tags::BEGIN_SEQ_NO).unwrap_or(1);
326 let end = msg.get_u64(tags::END_SEQ_NO).unwrap_or(0);
327 // Resend policy: gap-fill the whole requested range. The new
328 // sequence number is one past our current outbound counter so
329 // the peer's expected-inbound resyncs to where we actually are.
330 let next_out = self.store.peek_out().await;
331 let new_seq_no = if end == 0 { next_out } else { (end + 1).max(next_out) };
332 let gap_fill = self.build_gap_fill(begin, new_seq_no).await;
333 outcome.outbound.push(gap_fill);
334 }
335 Some(MsgType::SequenceReset) => {
336 // Gap-fill or reset: trust NewSeqNo(36) and advance the expected
337 // inbound counter to it. (We do not enforce that GapFill resets
338 // only move forward here; that validation is an extension point.)
339 if let Some(new_seq) = msg.get_u64(tags::NEW_SEQ_NO) {
340 // current_in becomes new_seq, so observe new_seq - 1.
341 if new_seq >= 1 {
342 self.store.observed_in(new_seq - 1).await;
343 }
344 }
345 }
346 Some(MsgType::Logout) => {
347 if let Some(seq) = seq {
348 self.store.observed_in(seq).await;
349 }
350 if self.state == SessionState::LogoutSent {
351 // Our logout was acked: fully disconnected.
352 self.state = SessionState::Disconnected;
353 } else {
354 // Peer-initiated: ack with our own Logout, then disconnect.
355 let reply = self.build_logout(None).await;
356 outcome.outbound.push(reply);
357 self.state = SessionState::Disconnected;
358 }
359 }
360 Some(MsgType::NewOrderSingle) | Some(MsgType::ExecutionReport) | Some(MsgType::Other(_)) => {
361 if let Some(seq) = seq {
362 self.store.observed_in(seq).await;
363 }
364 outcome.application = Some(msg);
365 }
366 None => {
367 // No MsgType — malformed at the session layer; surface as ignored.
368 outcome.ignored_duplicate = true;
369 }
370 }
371
372 outcome
373 }
374
375 /// Drive this FSM over a live `atomr-streams` TCP connection.
376 ///
377 /// Reads SOH-delimited frames from `inbound` (already framed `Bytes`),
378 /// parses them, runs [`handle_inbound`](Self::handle_inbound), and writes
379 /// every produced outbound message to `writer`. Application messages are
380 /// forwarded to `app_tx`. Outbound application messages submitted on
381 /// `app_rx` are stamped and written. The loop ends when the inbound stream
382 /// closes or the session reaches [`SessionState::Disconnected`] after a
383 /// logout.
384 ///
385 /// This helper exists for real deployments; the protocol logic itself is
386 /// fully testable through [`handle_inbound`](Self::handle_inbound) without
387 /// any socket.
388 pub async fn run(
389 mut self,
390 inbound: atomr_streams::Source<Result<Bytes, atomr_streams::FramingError>>,
391 writer: tokio::sync::mpsc::UnboundedSender<Bytes>,
392 app_tx: tokio::sync::mpsc::UnboundedSender<FixMessage>,
393 mut app_rx: tokio::sync::mpsc::UnboundedReceiver<FixMessage>,
394 ) {
395 // Bridge the framed inbound Source into a channel using the public
396 // streams Sink, so we can `select!` it against the timer and app side.
397 let (frame_tx, mut frame_rx) =
398 tokio::sync::mpsc::unbounded_channel::<Result<Bytes, atomr_streams::FramingError>>();
399 tokio::spawn(async move {
400 atomr_streams::Sink::to_sender(inbound, frame_tx).await;
401 });
402
403 // Kick off with a logon.
404 let logon = self.build_logon().await;
405 let _ = writer.send(logon.to_wire());
406
407 let mut hb = tokio::time::interval(self.config.heartbeat);
408 hb.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
409
410 loop {
411 tokio::select! {
412 frame = frame_rx.recv() => {
413 let Some(frame) = frame else { break; };
414 let Ok(bytes) = frame else { continue; };
415 let Ok(msg) = FixMessage::parse(&bytes) else { continue; };
416 let outcome = self.handle_inbound(msg).await;
417 for out in &outcome.outbound {
418 let _ = writer.send(out.to_wire());
419 }
420 if let Some(app) = outcome.application {
421 let _ = app_tx.send(app);
422 }
423 if self.state == SessionState::Disconnected {
424 break;
425 }
426 }
427 app = app_rx.recv() => {
428 let Some(app) = app else { continue; };
429 if self.state == SessionState::Active {
430 let out = self.build_app_message(app).await;
431 let _ = writer.send(out.to_wire());
432 }
433 }
434 _ = hb.tick() => {
435 if self.state == SessionState::Active {
436 let hb_msg = self.heartbeat().await;
437 let _ = writer.send(hb_msg.to_wire());
438 }
439 }
440 }
441 }
442 }
443}
444
445/// The result of [`FixSession::handle_inbound`].
446#[derive(Debug, Default)]
447pub struct InboundOutcome {
448 /// Administrative replies the session wants to send, in order.
449 pub outbound: Vec<FixMessage>,
450 /// An application message to surface to the application, if any.
451 pub application: Option<FixMessage>,
452 /// `Some((expected, received))` when a sequence gap was detected.
453 pub gap_detected: Option<(u64, u64)>,
454 /// True when a stale duplicate / malformed message was ignored.
455 pub ignored_duplicate: bool,
456}
457
458/// A minimal UTC `SendingTime(52)` stamp (`YYYYMMDD-HH:MM:SS`). We avoid a
459/// chrono dependency: the value's exact resolution is not load-bearing for the
460/// session FSM, only its presence and format.
461fn sending_time() -> String {
462 let now =
463 std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap_or_default().as_secs();
464 // Decompose seconds-since-epoch into a civil UTC date/time without chrono.
465 let days = now / 86_400;
466 let secs_of_day = now % 86_400;
467 let (h, m, s) = (secs_of_day / 3600, (secs_of_day % 3600) / 60, secs_of_day % 60);
468 let (y, mo, d) = civil_from_days(days as i64);
469 format!("{y:04}{mo:02}{d:02}-{h:02}:{m:02}:{s:02}")
470}
471
472/// Convert days-since-Unix-epoch to a civil (year, month, day) using Howard
473/// Hinnant's algorithm.
474fn civil_from_days(z: i64) -> (i64, u32, u32) {
475 let z = z + 719_468;
476 let era = if z >= 0 { z } else { z - 146_096 } / 146_097;
477 let doe = (z - era * 146_097) as u64;
478 let yoe = (doe - doe / 1460 + doe / 36_524 - doe / 146_096) / 365;
479 let y = yoe as i64 + era * 400;
480 let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
481 let mp = (5 * doy + 2) / 153;
482 let d = (doy - (153 * mp + 2) / 5 + 1) as u32;
483 let m = if mp < 10 { mp + 3 } else { mp - 9 } as u32;
484 (if m <= 2 { y + 1 } else { y }, m, d)
485}