Skip to main content

atomr_fix/
seq_store.rs

1//! Pluggable sequence-number store.
2//!
3//! FIX requires both peers to maintain a strictly increasing outbound
4//! `MsgSeqNum` and to track the highest inbound sequence number they have
5//! processed, so that a reconnect can detect gaps and issue / honour a
6//! `ResendRequest`. [`FixSeqStore`] abstracts that persistence point.
7//!
8//! [`InMemorySeqStore`] is the default, test-friendly implementation backed by
9//! atomics. A durable implementation (for example one backed by
10//! `atomr-persistence`) only needs to implement the same four async methods;
11//! the session layer interacts with the store exclusively through this trait,
12//! so swapping in a crash-safe store requires no changes to the FSM. We
13//! deliberately keep `atomr-persistence` out of this crate's dependency tree —
14//! this trait *is* the integration point.
15
16use std::sync::atomic::{AtomicU64, Ordering};
17
18/// Persistence for the two sequence counters a FIX session must keep.
19///
20/// * `next_out` returns the sequence number to stamp on the *next* outbound
21///   message and atomically advances the counter, so two concurrent sends can
22///   never reuse a number.
23/// * `observed_in` records that an inbound message with sequence number `n`
24///   was processed (tracking the high-water mark).
25/// * `current_in` returns the *next expected* inbound sequence number, i.e.
26///   one past the highest observed.
27/// * `reset` returns both counters to their initial state (used for
28///   `ResetSeqNumFlag=Y` logon and `SequenceReset`).
29#[async_trait::async_trait]
30pub trait FixSeqStore: Send + Sync {
31    /// The next outbound `MsgSeqNum`, advancing the counter.
32    async fn next_out(&self) -> u64;
33
34    /// Peek the next outbound `MsgSeqNum` without advancing (for building a
35    /// resend range or diagnostics).
36    async fn peek_out(&self) -> u64;
37
38    /// Record that an inbound message with `MsgSeqNum == n` was processed.
39    async fn observed_in(&self, n: u64);
40
41    /// The next *expected* inbound `MsgSeqNum` (one past the highest observed).
42    async fn current_in(&self) -> u64;
43
44    /// Reset both counters to 1 (next outbound = 1, next expected inbound = 1).
45    async fn reset(&self);
46}
47
48/// In-memory [`FixSeqStore`] backed by atomics. Counters start at 1, matching
49/// the FIX convention that the first message of a session carries
50/// `MsgSeqNum=1`.
51#[derive(Debug)]
52pub struct InMemorySeqStore {
53    /// Next outbound sequence number to hand out.
54    out: AtomicU64,
55    /// Next expected inbound sequence number.
56    in_expected: AtomicU64,
57}
58
59impl InMemorySeqStore {
60    /// A fresh store with both counters at 1.
61    pub fn new() -> Self {
62        InMemorySeqStore { out: AtomicU64::new(1), in_expected: AtomicU64::new(1) }
63    }
64
65    /// A store seeded with explicit counters — useful for simulating recovery
66    /// after a reconnect where the previous session left off mid-stream.
67    pub fn with_counters(next_out: u64, next_in: u64) -> Self {
68        InMemorySeqStore { out: AtomicU64::new(next_out.max(1)), in_expected: AtomicU64::new(next_in.max(1)) }
69    }
70}
71
72impl Default for InMemorySeqStore {
73    fn default() -> Self {
74        Self::new()
75    }
76}
77
78#[async_trait::async_trait]
79impl FixSeqStore for InMemorySeqStore {
80    async fn next_out(&self) -> u64 {
81        self.out.fetch_add(1, Ordering::SeqCst)
82    }
83
84    async fn peek_out(&self) -> u64 {
85        self.out.load(Ordering::SeqCst)
86    }
87
88    async fn observed_in(&self, n: u64) {
89        // High-water mark: next expected = max(current, n + 1).
90        let want = n.saturating_add(1);
91        let mut cur = self.in_expected.load(Ordering::SeqCst);
92        while want > cur {
93            match self.in_expected.compare_exchange_weak(cur, want, Ordering::SeqCst, Ordering::SeqCst) {
94                Ok(_) => break,
95                Err(actual) => cur = actual,
96            }
97        }
98    }
99
100    async fn current_in(&self) -> u64 {
101        self.in_expected.load(Ordering::SeqCst)
102    }
103
104    async fn reset(&self) {
105        self.out.store(1, Ordering::SeqCst);
106        self.in_expected.store(1, Ordering::SeqCst);
107    }
108}
109
110#[cfg(test)]
111mod tests {
112    use super::*;
113
114    #[tokio::test]
115    async fn next_out_increments_and_persists() {
116        let store = InMemorySeqStore::new();
117        assert_eq!(store.next_out().await, 1);
118        assert_eq!(store.next_out().await, 2);
119        assert_eq!(store.next_out().await, 3);
120        // peek does not advance.
121        assert_eq!(store.peek_out().await, 4);
122        assert_eq!(store.peek_out().await, 4);
123    }
124
125    #[tokio::test]
126    async fn observed_in_tracks_high_water_mark() {
127        let store = InMemorySeqStore::new();
128        assert_eq!(store.current_in().await, 1);
129        store.observed_in(1).await;
130        assert_eq!(store.current_in().await, 2);
131        store.observed_in(2).await;
132        assert_eq!(store.current_in().await, 3);
133        // An out-of-order lower number does not regress the high-water mark.
134        store.observed_in(1).await;
135        assert_eq!(store.current_in().await, 3);
136        // Jumping ahead advances it.
137        store.observed_in(9).await;
138        assert_eq!(store.current_in().await, 10);
139    }
140
141    #[tokio::test]
142    async fn reset_returns_to_initial() {
143        let store = InMemorySeqStore::with_counters(50, 60);
144        assert_eq!(store.peek_out().await, 50);
145        assert_eq!(store.current_in().await, 60);
146        store.reset().await;
147        assert_eq!(store.peek_out().await, 1);
148        assert_eq!(store.current_in().await, 1);
149    }
150}