use std::sync::{
Arc,
atomic::{AtomicI64, Ordering},
};
use dashmap::DashMap;
use thiserror::Error;
pub const DEFAULT_SKIP_WINDOW: u32 = 16;
#[derive(Debug, Error, PartialEq, Eq)]
pub enum NonceError {
#[error("nonce manager not initialized for account={account_index}, api_key={api_key_index}")]
NotInitialized {
account_index: i64,
api_key_index: u8,
},
#[error(
"skip-window exhausted for account={account_index}, api_key={api_key_index}: outstanding={outstanding}, window={skip_window}"
)]
SkipWindowExhausted {
account_index: i64,
api_key_index: u8,
outstanding: u32,
skip_window: u32,
},
#[error(
"no outstanding nonce to roll back for account={account_index}, api_key={api_key_index}"
)]
NothingToRollBack {
account_index: i64,
api_key_index: u8,
},
}
#[derive(Debug)]
pub struct NonceManager {
skip_window: u32,
states: DashMap<(i64, u8), Arc<AccountNonce>>,
}
impl NonceManager {
#[must_use]
pub fn new(skip_window: u32) -> Self {
Self {
skip_window,
states: DashMap::new(),
}
}
#[must_use]
pub fn skip_window(&self) -> u32 {
self.skip_window
}
pub fn refresh(&self, account_index: i64, api_key_index: u8, venue_next_nonce: i64) {
let entry = self
.states
.entry((account_index, api_key_index))
.or_insert_with(|| Arc::new(AccountNonce::new(venue_next_nonce - 1)));
entry
.baseline
.store(venue_next_nonce - 1, Ordering::Release);
entry
.last_issued
.store(venue_next_nonce - 1, Ordering::Release);
}
pub fn next_nonce(&self, account_index: i64, api_key_index: u8) -> Result<i64, NonceError> {
let state = self.state_for(account_index, api_key_index)?;
loop {
let last = state.last_issued.load(Ordering::Acquire);
let baseline = state.baseline.load(Ordering::Acquire);
let next = last.wrapping_add(1);
let outstanding = next.saturating_sub(baseline);
if outstanding > i64::from(self.skip_window) {
return Err(NonceError::SkipWindowExhausted {
account_index,
api_key_index,
outstanding: u32::try_from(outstanding).unwrap_or(u32::MAX),
skip_window: self.skip_window,
});
}
if state
.last_issued
.compare_exchange_weak(last, next, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
return Ok(next);
}
}
}
pub fn ack_failure(&self, account_index: i64, api_key_index: u8) -> Result<i64, NonceError> {
let state = self.state_for(account_index, api_key_index)?;
loop {
let last = state.last_issued.load(Ordering::Acquire);
let baseline = state.baseline.load(Ordering::Acquire);
if last == baseline {
return Err(NonceError::NothingToRollBack {
account_index,
api_key_index,
});
}
let prev = last - 1;
if state
.last_issued
.compare_exchange_weak(last, prev, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
return Ok(last);
}
}
}
#[must_use]
pub fn last_issued(&self, account_index: i64, api_key_index: u8) -> Option<i64> {
self.states
.get(&(account_index, api_key_index))
.map(|s| s.last_issued.load(Ordering::Acquire))
}
#[must_use]
pub fn baseline(&self, account_index: i64, api_key_index: u8) -> Option<i64> {
self.states
.get(&(account_index, api_key_index))
.map(|s| s.baseline.load(Ordering::Acquire))
}
fn state_for(
&self,
account_index: i64,
api_key_index: u8,
) -> Result<Arc<AccountNonce>, NonceError> {
let entry =
self.states
.get(&(account_index, api_key_index))
.ok_or(NonceError::NotInitialized {
account_index,
api_key_index,
})?;
let state = entry.value().clone();
drop(entry);
Ok(state)
}
}
impl Default for NonceManager {
fn default() -> Self {
Self::new(DEFAULT_SKIP_WINDOW)
}
}
#[derive(Debug)]
struct AccountNonce {
last_issued: AtomicI64,
baseline: AtomicI64,
}
impl AccountNonce {
fn new(initial: i64) -> Self {
Self {
last_issued: AtomicI64::new(initial),
baseline: AtomicI64::new(initial),
}
}
}
#[cfg(test)]
mod tests {
use std::{sync::Arc as StdArc, thread};
use proptest::prelude::*;
use rstest::rstest;
use super::*;
const ACCOUNT: i64 = 12345;
const API_KEY: u8 = 5;
#[rstest]
fn next_nonce_uninitialized_errors() {
let mgr = NonceManager::new(8);
let err = mgr.next_nonce(ACCOUNT, API_KEY).expect_err("must error");
assert_eq!(
err,
NonceError::NotInitialized {
account_index: ACCOUNT,
api_key_index: API_KEY
},
);
}
#[rstest]
fn ack_failure_uninitialized_errors() {
let mgr = NonceManager::new(8);
let err = mgr.ack_failure(ACCOUNT, API_KEY).expect_err("must error");
assert_eq!(
err,
NonceError::NotInitialized {
account_index: ACCOUNT,
api_key_index: API_KEY
},
);
}
#[rstest]
fn default_uses_default_skip_window() {
let mgr = NonceManager::default();
assert_eq!(
mgr.skip_window(),
DEFAULT_SKIP_WINDOW,
"Default impl must use DEFAULT_SKIP_WINDOW, was {}",
mgr.skip_window(),
);
}
#[rstest]
fn last_issued_and_baseline_return_none_for_absent_key() {
let mgr = NonceManager::new(8);
assert_eq!(
mgr.last_issued(ACCOUNT, API_KEY),
None,
"absent key must report no last_issued",
);
assert_eq!(
mgr.baseline(ACCOUNT, API_KEY),
None,
"absent key must report no baseline",
);
}
#[rstest]
fn baseline_pins_to_refresh_value_through_allocations() {
let mgr = NonceManager::new(8);
mgr.refresh(ACCOUNT, API_KEY, 42);
assert_eq!(
mgr.baseline(ACCOUNT, API_KEY),
Some(41),
"baseline must equal venue_next_nonce - 1 after refresh",
);
for _ in 0..3 {
mgr.next_nonce(ACCOUNT, API_KEY).unwrap();
}
assert_eq!(
mgr.baseline(ACCOUNT, API_KEY),
Some(41),
"baseline must not move when next_nonce advances last_issued",
);
mgr.refresh(ACCOUNT, API_KEY, 100);
assert_eq!(
mgr.baseline(ACCOUNT, API_KEY),
Some(99),
"subsequent refresh must reset baseline to new venue value - 1",
);
}
#[rstest]
fn refresh_then_next_nonce_starts_at_venue_value() {
let mgr = NonceManager::new(8);
mgr.refresh(ACCOUNT, API_KEY, 42);
let n = mgr.next_nonce(ACCOUNT, API_KEY).unwrap();
assert_eq!(n, 42, "first nonce must equal venue baseline, was {n}");
}
#[rstest]
fn next_nonce_is_monotonic_and_gap_free() {
let mgr = NonceManager::new(64);
mgr.refresh(ACCOUNT, API_KEY, 0);
let issued: Vec<i64> = (0..32)
.map(|_| mgr.next_nonce(ACCOUNT, API_KEY).unwrap())
.collect();
let expected: Vec<i64> = (0..32).collect();
assert_eq!(
issued, expected,
"nonces must be monotonic and gap-free, was {issued:?}",
);
}
#[rstest]
fn skip_window_caps_outstanding_allocations() {
let mgr = NonceManager::new(4);
mgr.refresh(ACCOUNT, API_KEY, 100);
for _ in 0..4 {
mgr.next_nonce(ACCOUNT, API_KEY).unwrap();
}
let err = mgr.next_nonce(ACCOUNT, API_KEY).expect_err("must error");
match err {
NonceError::SkipWindowExhausted {
outstanding,
skip_window,
..
} => {
assert_eq!(skip_window, 4, "skip_window mismatch, was {skip_window}");
assert_eq!(outstanding, 5, "outstanding mismatch, was {outstanding}");
}
other => panic!("expected SkipWindowExhausted, was {other:?}"),
}
}
#[rstest]
fn ack_failure_rolls_back_most_recent_issuance() {
let mgr = NonceManager::new(8);
mgr.refresh(ACCOUNT, API_KEY, 0);
let issued = mgr.next_nonce(ACCOUNT, API_KEY).unwrap();
let rolled = mgr.ack_failure(ACCOUNT, API_KEY).unwrap();
assert_eq!(
rolled, issued,
"ack_failure must report rolled-back nonce, was {rolled}",
);
let reused = mgr.next_nonce(ACCOUNT, API_KEY).unwrap();
assert_eq!(
reused, issued,
"rolled-back nonce must be reissued, was {reused}"
);
}
#[rstest]
fn ack_failure_at_baseline_errors() {
let mgr = NonceManager::new(8);
mgr.refresh(ACCOUNT, API_KEY, 7);
let err = mgr.ack_failure(ACCOUNT, API_KEY).expect_err("must error");
assert_eq!(
err,
NonceError::NothingToRollBack {
account_index: ACCOUNT,
api_key_index: API_KEY
},
);
}
#[rstest]
fn refresh_resets_after_skip_window_exhausted() {
let mgr = NonceManager::new(2);
mgr.refresh(ACCOUNT, API_KEY, 0);
for _ in 0..2 {
mgr.next_nonce(ACCOUNT, API_KEY).unwrap();
}
assert!(
mgr.next_nonce(ACCOUNT, API_KEY).is_err(),
"window must trip"
);
mgr.refresh(ACCOUNT, API_KEY, 5);
let n = mgr.next_nonce(ACCOUNT, API_KEY).unwrap();
assert_eq!(n, 5, "refresh must re-arm allocation, was {n}");
}
#[rstest]
fn distinct_keys_track_independent_state() {
let mgr = NonceManager::new(8);
mgr.refresh(ACCOUNT, 0, 0);
mgr.refresh(ACCOUNT, 1, 100);
let a = mgr.next_nonce(ACCOUNT, 0).unwrap();
let b = mgr.next_nonce(ACCOUNT, 1).unwrap();
assert_eq!(a, 0, "key 0 must start at 0, was {a}");
assert_eq!(b, 100, "key 1 must start at 100, was {b}");
}
#[rstest]
fn concurrent_callers_see_no_duplicate_or_gap() {
let mgr = StdArc::new(NonceManager::new(10_000));
mgr.refresh(ACCOUNT, API_KEY, 0);
let threads = 8;
let per_thread = 250;
let handles: Vec<_> = (0..threads)
.map(|_| {
let mgr = StdArc::clone(&mgr);
thread::spawn(move || -> Vec<i64> {
(0..per_thread)
.map(|_| mgr.next_nonce(ACCOUNT, API_KEY).unwrap())
.collect()
})
})
.collect();
let mut all = Vec::with_capacity(threads * per_thread);
for h in handles {
all.extend(h.join().unwrap());
}
all.sort_unstable();
let expected: Vec<i64> = (0..(threads as i64) * (per_thread as i64)).collect();
assert_eq!(
all, expected,
"concurrent issuance must cover [0, N) without gaps or duplicates",
);
}
proptest! {
#[rstest]
fn prop_sequential_issuance_is_contiguous(
baseline in 0i64..1_000_000,
count in 1usize..256,
) {
let mgr = NonceManager::new(u32::MAX);
mgr.refresh(ACCOUNT, API_KEY, baseline);
let issued: Vec<i64> = (0..count)
.map(|_| mgr.next_nonce(ACCOUNT, API_KEY).unwrap())
.collect();
for (i, &n) in issued.iter().enumerate() {
prop_assert_eq!(n, baseline + i as i64);
}
prop_assert_eq!(
mgr.last_issued(ACCOUNT, API_KEY),
Some(baseline + count as i64 - 1),
);
}
#[rstest]
fn prop_ack_failure_is_idempotent_round_trip(
baseline in 0i64..1_000_000,
advance in 1usize..32,
) {
let mgr = NonceManager::new(u32::MAX);
mgr.refresh(ACCOUNT, API_KEY, baseline);
for _ in 0..advance - 1 {
mgr.next_nonce(ACCOUNT, API_KEY).unwrap();
}
let issued = mgr.next_nonce(ACCOUNT, API_KEY).unwrap();
let rolled = mgr.ack_failure(ACCOUNT, API_KEY).unwrap();
prop_assert_eq!(rolled, issued);
let reused = mgr.next_nonce(ACCOUNT, API_KEY).unwrap();
prop_assert_eq!(reused, issued);
}
}
}