use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
const FATAL_DESYNC_THRESHOLD: u8 = 5;
const FORWARD_BOUNDARY: u8 = 128;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ReceivedValidation {
Valid { sequence: u8 },
Duplicate { sequence: u8, expected: u8 },
OutOfOrder {
sequence: u8,
expected: u8,
distance: u8,
},
FatalDesync {
sequence: u8,
expected: u8,
distance: u8,
},
}
impl ReceivedValidation {
pub fn should_process(&self) -> bool {
matches!(self, Self::Valid { .. } | Self::OutOfOrder { .. })
}
pub fn should_ack(&self) -> bool {
!matches!(self, Self::FatalDesync { .. })
}
pub fn requires_restart(&self) -> bool {
matches!(self, Self::FatalDesync { .. })
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AckValidation {
Valid,
Mismatch { expected: u8, actual: u8 },
}
#[derive(Debug)]
pub struct SequenceStats {
frames_sent: AtomicU64,
frames_received: AtomicU64,
duplicates_detected: AtomicU64,
out_of_order_detected: AtomicU64,
fatal_desyncs: AtomicU64,
resets: AtomicU64,
}
impl Default for SequenceStats {
fn default() -> Self {
Self {
frames_sent: AtomicU64::new(0),
frames_received: AtomicU64::new(0),
duplicates_detected: AtomicU64::new(0),
out_of_order_detected: AtomicU64::new(0),
fatal_desyncs: AtomicU64::new(0),
resets: AtomicU64::new(0),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct SequenceStatsSnapshot {
pub frames_sent: u64,
pub frames_received: u64,
pub duplicates_detected: u64,
pub out_of_order_detected: u64,
pub fatal_desyncs: u64,
pub resets: u64,
}
#[derive(Debug)]
pub struct SequenceTracker {
sno: AtomicU8,
rno: AtomicU8,
fatal_desync_threshold: u8,
stats: SequenceStats,
}
impl SequenceTracker {
pub fn new() -> Self {
Self {
sno: AtomicU8::new(0),
rno: AtomicU8::new(0),
fatal_desync_threshold: FATAL_DESYNC_THRESHOLD,
stats: SequenceStats::default(),
}
}
pub fn with_desync_threshold(threshold: u8) -> Self {
Self {
sno: AtomicU8::new(0),
rno: AtomicU8::new(0),
fatal_desync_threshold: threshold.max(2),
stats: SequenceStats::default(),
}
}
pub fn next_sno(&self) -> u8 {
loop {
let current = self.sno.load(Ordering::SeqCst);
let next = current.wrapping_add(1);
if self
.sno
.compare_exchange(current, next, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
self.stats.frames_sent.fetch_add(1, Ordering::Relaxed);
return current;
}
}
}
pub fn current_sno(&self) -> u8 {
self.sno.load(Ordering::SeqCst)
}
pub fn current_rno(&self) -> u8 {
self.rno.load(Ordering::SeqCst)
}
pub fn validate_received(&self, sequence: u8) -> ReceivedValidation {
let expected = self.rno.load(Ordering::SeqCst);
if sequence == expected {
loop {
let current = self.rno.load(Ordering::SeqCst);
if current != expected {
break;
}
let next = expected.wrapping_add(1);
if self
.rno
.compare_exchange(current, next, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
break;
}
}
self.stats.frames_received.fetch_add(1, Ordering::Relaxed);
return ReceivedValidation::Valid { sequence };
}
if sequence.wrapping_add(1) == expected {
self.stats
.duplicates_detected
.fetch_add(1, Ordering::Relaxed);
return ReceivedValidation::Duplicate { sequence, expected };
}
let distance = sequence.wrapping_sub(expected);
if distance < FORWARD_BOUNDARY {
if distance >= self.fatal_desync_threshold {
self.stats.fatal_desyncs.fetch_add(1, Ordering::Relaxed);
ReceivedValidation::FatalDesync {
sequence,
expected,
distance,
}
} else {
self.stats
.out_of_order_detected
.fetch_add(1, Ordering::Relaxed);
ReceivedValidation::OutOfOrder {
sequence,
expected,
distance,
}
}
} else {
self.stats
.duplicates_detected
.fetch_add(1, Ordering::Relaxed);
ReceivedValidation::Duplicate { sequence, expected }
}
}
pub fn validate_ack(&self, acked_sequence: u8) -> AckValidation {
let last_sent = self.sno.load(Ordering::SeqCst).wrapping_sub(1);
if acked_sequence == last_sent {
AckValidation::Valid
} else {
AckValidation::Mismatch {
expected: last_sent,
actual: acked_sequence,
}
}
}
pub fn reset(&self) {
self.sno.store(0, Ordering::SeqCst);
self.rno.store(0, Ordering::SeqCst);
self.stats.resets.fetch_add(1, Ordering::Relaxed);
}
pub fn stats_snapshot(&self) -> SequenceStatsSnapshot {
SequenceStatsSnapshot {
frames_sent: self.stats.frames_sent.load(Ordering::Relaxed),
frames_received: self.stats.frames_received.load(Ordering::Relaxed),
duplicates_detected: self.stats.duplicates_detected.load(Ordering::Relaxed),
out_of_order_detected: self.stats.out_of_order_detected.load(Ordering::Relaxed),
fatal_desyncs: self.stats.fatal_desyncs.load(Ordering::Relaxed),
resets: self.stats.resets.load(Ordering::Relaxed),
}
}
pub fn build_ack_frame(channel_id: u8, sequence: u8, status: u8) -> Vec<u8> {
vec![4, channel_id, sequence, status]
}
}
impl Default for SequenceTracker {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_valid_sequence() {
let tracker = SequenceTracker::new();
assert_eq!(tracker.current_rno(), 0);
let result = tracker.validate_received(0);
assert!(matches!(result, ReceivedValidation::Valid { sequence: 0 }));
assert_eq!(tracker.current_rno(), 1);
let result = tracker.validate_received(1);
assert!(matches!(result, ReceivedValidation::Valid { sequence: 1 }));
assert_eq!(tracker.current_rno(), 2);
}
#[test]
fn test_duplicate_detection() {
let tracker = SequenceTracker::new();
let result = tracker.validate_received(0);
assert!(matches!(result, ReceivedValidation::Valid { .. }));
assert_eq!(tracker.current_rno(), 1);
let result = tracker.validate_received(0);
assert!(matches!(
result,
ReceivedValidation::Duplicate {
sequence: 0,
expected: 1
}
));
assert!(result.should_ack());
assert!(!result.should_process());
assert_eq!(tracker.current_rno(), 1); }
#[test]
fn test_out_of_order() {
let tracker = SequenceTracker::new();
let result = tracker.validate_received(2);
assert!(matches!(
result,
ReceivedValidation::OutOfOrder {
sequence: 2,
expected: 0,
distance: 2
}
));
assert!(result.should_process());
assert!(result.should_ack());
}
#[test]
fn test_fatal_desync() {
let tracker = SequenceTracker::new();
let result = tracker.validate_received(5);
assert!(matches!(
result,
ReceivedValidation::FatalDesync {
sequence: 5,
expected: 0,
distance: 5
}
));
assert!(result.requires_restart());
assert!(!result.should_ack());
}
#[test]
fn test_wrapping_at_255() {
let tracker = SequenceTracker::new();
for i in 0..255u8 {
let result = tracker.validate_received(i);
assert!(matches!(result, ReceivedValidation::Valid { .. }));
}
assert_eq!(tracker.current_rno(), 255);
let result = tracker.validate_received(255);
assert!(matches!(result, ReceivedValidation::Valid { .. }));
assert_eq!(tracker.current_rno(), 0);
let result = tracker.validate_received(0);
assert!(matches!(result, ReceivedValidation::Valid { .. }));
assert_eq!(tracker.current_rno(), 1);
}
#[test]
fn test_duplicate_at_wrap_boundary() {
let tracker = SequenceTracker::new();
for i in 0..=255u8 {
tracker.validate_received(i);
}
assert_eq!(tracker.current_rno(), 0);
let result = tracker.validate_received(255);
assert!(matches!(
result,
ReceivedValidation::Duplicate {
sequence: 255,
expected: 0
}
));
}
#[test]
fn test_sno_cas_wrapping() {
let tracker = SequenceTracker::new();
for expected in 0..=255u8 {
let got = tracker.next_sno();
assert_eq!(got, expected);
}
assert_eq!(tracker.next_sno(), 0);
}
#[test]
fn test_ack_validation() {
let tracker = SequenceTracker::new();
let seq = tracker.next_sno(); assert_eq!(seq, 0);
assert_eq!(tracker.validate_ack(0), AckValidation::Valid);
assert!(matches!(
tracker.validate_ack(1),
AckValidation::Mismatch {
expected: 0,
actual: 1
}
));
}
#[test]
fn test_reset() {
let tracker = SequenceTracker::new();
tracker.next_sno();
tracker.next_sno();
tracker.validate_received(0);
tracker.validate_received(1);
assert_eq!(tracker.current_sno(), 2);
assert_eq!(tracker.current_rno(), 2);
tracker.reset();
assert_eq!(tracker.current_sno(), 0);
assert_eq!(tracker.current_rno(), 0);
assert_eq!(tracker.stats_snapshot().resets, 1);
}
#[test]
fn test_stats_tracking() {
let tracker = SequenceTracker::new();
tracker.validate_received(0); tracker.validate_received(0); tracker.validate_received(3); tracker.validate_received(10);
let stats = tracker.stats_snapshot();
assert_eq!(stats.frames_received, 1);
assert_eq!(stats.duplicates_detected, 1);
assert_eq!(stats.out_of_order_detected, 1);
assert_eq!(stats.fatal_desyncs, 1);
}
#[test]
fn test_custom_threshold() {
let tracker = SequenceTracker::with_desync_threshold(10);
let result = tracker.validate_received(5);
assert!(matches!(result, ReceivedValidation::OutOfOrder { .. }));
let result = tracker.validate_received(10);
assert!(matches!(result, ReceivedValidation::FatalDesync { .. }));
}
#[test]
fn test_build_ack_frame() {
let frame = SequenceTracker::build_ack_frame(3, 7, 0);
assert_eq!(frame, vec![4, 3, 7, 0]);
let error_frame = SequenceTracker::build_ack_frame(3, 7, 0x21);
assert_eq!(error_frame, vec![4, 3, 7, 0x21]);
}
}