atomr_fix/seq_store.rs
1//! Pluggable sequence-number store.
2//!
3//! FIX requires both peers to maintain a strictly increasing outbound
4//! `MsgSeqNum` and to track the highest inbound sequence number they have
5//! processed, so that a reconnect can detect gaps and issue / honour a
6//! `ResendRequest`. [`FixSeqStore`] abstracts that persistence point.
7//!
8//! [`InMemorySeqStore`] is the default, test-friendly implementation backed by
9//! atomics. A durable implementation (for example one backed by
10//! `atomr-persistence`) only needs to implement the same four async methods;
11//! the session layer interacts with the store exclusively through this trait,
12//! so swapping in a crash-safe store requires no changes to the FSM. We
13//! deliberately keep `atomr-persistence` out of this crate's dependency tree —
14//! this trait *is* the integration point.
15
16use std::sync::atomic::{AtomicU64, Ordering};
17
18/// Persistence for the two sequence counters a FIX session must keep.
19///
20/// * `next_out` returns the sequence number to stamp on the *next* outbound
21/// message and atomically advances the counter, so two concurrent sends can
22/// never reuse a number.
23/// * `observed_in` records that an inbound message with sequence number `n`
24/// was processed (tracking the high-water mark).
25/// * `current_in` returns the *next expected* inbound sequence number, i.e.
26/// one past the highest observed.
27/// * `reset` returns both counters to their initial state (used for
28/// `ResetSeqNumFlag=Y` logon and `SequenceReset`).
29#[async_trait::async_trait]
30pub trait FixSeqStore: Send + Sync {
31 /// The next outbound `MsgSeqNum`, advancing the counter.
32 async fn next_out(&self) -> u64;
33
34 /// Peek the next outbound `MsgSeqNum` without advancing (for building a
35 /// resend range or diagnostics).
36 async fn peek_out(&self) -> u64;
37
38 /// Record that an inbound message with `MsgSeqNum == n` was processed.
39 async fn observed_in(&self, n: u64);
40
41 /// The next *expected* inbound `MsgSeqNum` (one past the highest observed).
42 async fn current_in(&self) -> u64;
43
44 /// Reset both counters to 1 (next outbound = 1, next expected inbound = 1).
45 async fn reset(&self);
46}
47
48/// In-memory [`FixSeqStore`] backed by atomics. Counters start at 1, matching
49/// the FIX convention that the first message of a session carries
50/// `MsgSeqNum=1`.
51#[derive(Debug)]
52pub struct InMemorySeqStore {
53 /// Next outbound sequence number to hand out.
54 out: AtomicU64,
55 /// Next expected inbound sequence number.
56 in_expected: AtomicU64,
57}
58
59impl InMemorySeqStore {
60 /// A fresh store with both counters at 1.
61 pub fn new() -> Self {
62 InMemorySeqStore { out: AtomicU64::new(1), in_expected: AtomicU64::new(1) }
63 }
64
65 /// A store seeded with explicit counters — useful for simulating recovery
66 /// after a reconnect where the previous session left off mid-stream.
67 pub fn with_counters(next_out: u64, next_in: u64) -> Self {
68 InMemorySeqStore { out: AtomicU64::new(next_out.max(1)), in_expected: AtomicU64::new(next_in.max(1)) }
69 }
70}
71
72impl Default for InMemorySeqStore {
73 fn default() -> Self {
74 Self::new()
75 }
76}
77
78#[async_trait::async_trait]
79impl FixSeqStore for InMemorySeqStore {
80 async fn next_out(&self) -> u64 {
81 self.out.fetch_add(1, Ordering::SeqCst)
82 }
83
84 async fn peek_out(&self) -> u64 {
85 self.out.load(Ordering::SeqCst)
86 }
87
88 async fn observed_in(&self, n: u64) {
89 // High-water mark: next expected = max(current, n + 1).
90 let want = n.saturating_add(1);
91 let mut cur = self.in_expected.load(Ordering::SeqCst);
92 while want > cur {
93 match self.in_expected.compare_exchange_weak(cur, want, Ordering::SeqCst, Ordering::SeqCst) {
94 Ok(_) => break,
95 Err(actual) => cur = actual,
96 }
97 }
98 }
99
100 async fn current_in(&self) -> u64 {
101 self.in_expected.load(Ordering::SeqCst)
102 }
103
104 async fn reset(&self) {
105 self.out.store(1, Ordering::SeqCst);
106 self.in_expected.store(1, Ordering::SeqCst);
107 }
108}
109
110#[cfg(test)]
111mod tests {
112 use super::*;
113
114 #[tokio::test]
115 async fn next_out_increments_and_persists() {
116 let store = InMemorySeqStore::new();
117 assert_eq!(store.next_out().await, 1);
118 assert_eq!(store.next_out().await, 2);
119 assert_eq!(store.next_out().await, 3);
120 // peek does not advance.
121 assert_eq!(store.peek_out().await, 4);
122 assert_eq!(store.peek_out().await, 4);
123 }
124
125 #[tokio::test]
126 async fn observed_in_tracks_high_water_mark() {
127 let store = InMemorySeqStore::new();
128 assert_eq!(store.current_in().await, 1);
129 store.observed_in(1).await;
130 assert_eq!(store.current_in().await, 2);
131 store.observed_in(2).await;
132 assert_eq!(store.current_in().await, 3);
133 // An out-of-order lower number does not regress the high-water mark.
134 store.observed_in(1).await;
135 assert_eq!(store.current_in().await, 3);
136 // Jumping ahead advances it.
137 store.observed_in(9).await;
138 assert_eq!(store.current_in().await, 10);
139 }
140
141 #[tokio::test]
142 async fn reset_returns_to_initial() {
143 let store = InMemorySeqStore::with_counters(50, 60);
144 assert_eq!(store.peek_out().await, 50);
145 assert_eq!(store.current_in().await, 60);
146 store.reset().await;
147 assert_eq!(store.peek_out().await, 1);
148 assert_eq!(store.current_in().await, 1);
149 }
150}