use std::sync::atomic::{AtomicU64, Ordering};
#[async_trait::async_trait]
pub trait FixSeqStore: Send + Sync {
async fn next_out(&self) -> u64;
async fn peek_out(&self) -> u64;
async fn observed_in(&self, n: u64);
async fn current_in(&self) -> u64;
async fn reset(&self);
}
#[derive(Debug)]
pub struct InMemorySeqStore {
out: AtomicU64,
in_expected: AtomicU64,
}
impl InMemorySeqStore {
pub fn new() -> Self {
InMemorySeqStore { out: AtomicU64::new(1), in_expected: AtomicU64::new(1) }
}
pub fn with_counters(next_out: u64, next_in: u64) -> Self {
InMemorySeqStore { out: AtomicU64::new(next_out.max(1)), in_expected: AtomicU64::new(next_in.max(1)) }
}
}
impl Default for InMemorySeqStore {
fn default() -> Self {
Self::new()
}
}
#[async_trait::async_trait]
impl FixSeqStore for InMemorySeqStore {
async fn next_out(&self) -> u64 {
self.out.fetch_add(1, Ordering::SeqCst)
}
async fn peek_out(&self) -> u64 {
self.out.load(Ordering::SeqCst)
}
async fn observed_in(&self, n: u64) {
let want = n.saturating_add(1);
let mut cur = self.in_expected.load(Ordering::SeqCst);
while want > cur {
match self.in_expected.compare_exchange_weak(cur, want, Ordering::SeqCst, Ordering::SeqCst) {
Ok(_) => break,
Err(actual) => cur = actual,
}
}
}
async fn current_in(&self) -> u64 {
self.in_expected.load(Ordering::SeqCst)
}
async fn reset(&self) {
self.out.store(1, Ordering::SeqCst);
self.in_expected.store(1, Ordering::SeqCst);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn next_out_increments_and_persists() {
let store = InMemorySeqStore::new();
assert_eq!(store.next_out().await, 1);
assert_eq!(store.next_out().await, 2);
assert_eq!(store.next_out().await, 3);
assert_eq!(store.peek_out().await, 4);
assert_eq!(store.peek_out().await, 4);
}
#[tokio::test]
async fn observed_in_tracks_high_water_mark() {
let store = InMemorySeqStore::new();
assert_eq!(store.current_in().await, 1);
store.observed_in(1).await;
assert_eq!(store.current_in().await, 2);
store.observed_in(2).await;
assert_eq!(store.current_in().await, 3);
store.observed_in(1).await;
assert_eq!(store.current_in().await, 3);
store.observed_in(9).await;
assert_eq!(store.current_in().await, 10);
}
#[tokio::test]
async fn reset_returns_to_initial() {
let store = InMemorySeqStore::with_counters(50, 60);
assert_eq!(store.peek_out().await, 50);
assert_eq!(store.current_in().await, 60);
store.reset().await;
assert_eq!(store.peek_out().await, 1);
assert_eq!(store.current_in().await, 1);
}
}