use std::collections::HashSet;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use bytes::Bytes;
use crate::errors::CoreError;
use crate::transport::session_transport::SessionTransport;
struct FaultState {
send_index: AtomicU64,
arm_drop: AtomicU64,
drop_indices: HashSet<u64>,
dup_indices: HashSet<u64>,
reorder_indices: HashSet<u64>,
pending_reorder: Mutex<Option<Vec<u8>>>,
delay_ms: AtomicU64,
}
enum SendFault {
Forward,
Drop,
Duplicate,
Reorder,
}
#[derive(Clone)]
pub struct FaultControl {
state: Arc<FaultState>,
}
impl FaultControl {
fn build(
drop_indices: HashSet<u64>,
dup_indices: HashSet<u64>,
reorder_indices: HashSet<u64>,
delay_ms: u64,
) -> Self {
Self {
state: Arc::new(FaultState {
send_index: AtomicU64::new(0),
arm_drop: AtomicU64::new(0),
drop_indices,
dup_indices,
reorder_indices,
pending_reorder: Mutex::new(None),
delay_ms: AtomicU64::new(delay_ms),
}),
}
}
pub fn new() -> Self {
Self::build(HashSet::new(), HashSet::new(), HashSet::new(), 0)
}
pub fn with_drop_indices(indices: &[u64]) -> Self {
Self::build(
indices.iter().copied().collect(),
HashSet::new(),
HashSet::new(),
0,
)
}
pub fn with_dup_indices(indices: &[u64]) -> Self {
Self::build(
HashSet::new(),
indices.iter().copied().collect(),
HashSet::new(),
0,
)
}
pub fn with_reorder_indices(indices: &[u64]) -> Self {
Self::build(
HashSet::new(),
HashSet::new(),
indices.iter().copied().collect(),
0,
)
}
pub fn with_delay(delay: Duration) -> Self {
Self::build(
HashSet::new(),
HashSet::new(),
HashSet::new(),
delay.as_millis() as u64,
)
}
pub fn arm_drop_next(&self, n: u64) {
self.state.arm_drop.store(n, Ordering::Relaxed);
}
pub fn set_delay(&self, delay: Duration) {
self.state
.delay_ms
.store(delay.as_millis() as u64, Ordering::Relaxed);
}
fn classify(&self) -> SendFault {
let index = self.state.send_index.fetch_add(1, Ordering::Relaxed);
if self.state.drop_indices.contains(&index) || self.consume_armed_drop() {
return SendFault::Drop;
}
if self.state.dup_indices.contains(&index) {
return SendFault::Duplicate;
}
if self.state.reorder_indices.contains(&index) {
return SendFault::Reorder;
}
SendFault::Forward
}
fn delay(&self) -> Option<Duration> {
match self.state.delay_ms.load(Ordering::Relaxed) {
0 => None,
ms => Some(Duration::from_millis(ms)),
}
}
fn take_pending_reorder(&self) -> Option<Vec<u8>> {
self.state.pending_reorder.lock().expect("poisoned").take()
}
fn hold_for_reorder(&self, data: Vec<u8>) -> Option<Vec<u8>> {
self.state
.pending_reorder
.lock()
.expect("poisoned")
.replace(data)
}
fn consume_armed_drop(&self) -> bool {
loop {
let pending = self.state.arm_drop.load(Ordering::Relaxed);
if pending == 0 {
return false;
}
if self
.state
.arm_drop
.compare_exchange_weak(pending, pending - 1, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
return true;
}
}
}
}
impl Default for FaultControl {
fn default() -> Self {
Self::new()
}
}
pub struct LossyTransport<T> {
inner: T,
control: FaultControl,
}
impl<T> LossyTransport<T> {
pub fn new(inner: T, control: FaultControl) -> Self {
Self { inner, control }
}
pub fn drop_sends(inner: T, indices: &[u64]) -> Self {
Self::new(inner, FaultControl::with_drop_indices(indices))
}
pub fn dup_sends(inner: T, indices: &[u64]) -> Self {
Self::new(inner, FaultControl::with_dup_indices(indices))
}
pub fn reorder_sends(inner: T, indices: &[u64]) -> Self {
Self::new(inner, FaultControl::with_reorder_indices(indices))
}
}
impl<T: SessionTransport> LossyTransport<T> {
pub async fn flush(&self) -> Result<(), CoreError> {
if let Some(held) = self.control.take_pending_reorder() {
self.inner.send_bytes(&held).await?;
}
Ok(())
}
}
impl<T: SessionTransport> SessionTransport for LossyTransport<T> {
async fn send_bytes(&self, data: &[u8]) -> Result<(), CoreError> {
if let Some(delay) = self.control.delay() {
tokio::time::sleep(delay).await;
}
match self.control.classify() {
SendFault::Drop => {
Ok(())
}
SendFault::Reorder => {
if let Some(prev) = self.control.hold_for_reorder(data.to_vec()) {
self.inner.send_bytes(&prev).await?;
}
Ok(())
}
SendFault::Duplicate => {
self.inner.send_bytes(data).await?;
self.inner.send_bytes(data).await?;
self.release_pending_reorder().await
}
SendFault::Forward => {
self.inner.send_bytes(data).await?;
self.release_pending_reorder().await
}
}
}
async fn recv_bytes(&self) -> Result<Bytes, CoreError> {
self.inner.recv_bytes().await
}
}
impl<T: SessionTransport> LossyTransport<T> {
async fn release_pending_reorder(&self) -> Result<(), CoreError> {
if let Some(held) = self.control.take_pending_reorder() {
self.inner.send_bytes(&held).await?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Mutex;
struct RecordingTransport {
forwarded: Arc<Mutex<Vec<Vec<u8>>>>,
}
impl SessionTransport for RecordingTransport {
async fn send_bytes(&self, data: &[u8]) -> Result<(), CoreError> {
self.forwarded.lock().expect("poisoned").push(data.to_vec());
Ok(())
}
async fn recv_bytes(&self) -> Result<Bytes, CoreError> {
Err(CoreError::NetworkError("recv unused in this test".into()))
}
}
#[tokio::test]
async fn lossy_transport_drops_configured_send_indices() {
let forwarded = Arc::new(Mutex::new(Vec::new()));
let inner = RecordingTransport {
forwarded: forwarded.clone(),
};
let lossy = LossyTransport::drop_sends(inner, &[1]);
lossy.send_bytes(b"f0").await.expect("send f0");
lossy.send_bytes(b"f1").await.expect("send f1"); lossy.send_bytes(b"f2").await.expect("send f2");
let got = forwarded.lock().expect("poisoned");
assert_eq!(
&*got,
&[b"f0".to_vec(), b"f2".to_vec()],
"frame at index 1 must be dropped, not forwarded to the inner transport"
);
}
#[tokio::test]
async fn arm_drop_next_drops_exactly_the_next_send() {
let forwarded = Arc::new(Mutex::new(Vec::new()));
let inner = RecordingTransport {
forwarded: forwarded.clone(),
};
let control = FaultControl::new();
let lossy = LossyTransport::new(inner, control.clone());
control.arm_drop_next(1); lossy.send_bytes(b"d0").await.expect("send d0"); lossy.send_bytes(b"d1").await.expect("send d1");
lossy.send_bytes(b"d2").await.expect("send d2");
let got = forwarded.lock().expect("poisoned");
assert_eq!(
&*got,
&[b"d1".to_vec(), b"d2".to_vec()],
"an armed drop of 1 must skip exactly the next send, then forward the rest"
);
}
#[tokio::test]
async fn dup_sends_forwards_configured_indices_twice() {
let forwarded = Arc::new(Mutex::new(Vec::new()));
let inner = RecordingTransport {
forwarded: forwarded.clone(),
};
let lossy = LossyTransport::dup_sends(inner, &[1]);
lossy.send_bytes(b"u0").await.expect("send u0");
lossy.send_bytes(b"u1").await.expect("send u1"); lossy.send_bytes(b"u2").await.expect("send u2");
let got = forwarded.lock().expect("poisoned");
assert_eq!(
&*got,
&[
b"u0".to_vec(),
b"u1".to_vec(),
b"u1".to_vec(),
b"u2".to_vec()
],
"the duplicated frame must reach the inner transport twice, in place"
);
}
#[tokio::test]
async fn reorder_sends_swaps_with_the_following_frame() {
let forwarded = Arc::new(Mutex::new(Vec::new()));
let inner = RecordingTransport {
forwarded: forwarded.clone(),
};
let lossy = LossyTransport::reorder_sends(inner, &[1]);
lossy.send_bytes(b"r0").await.expect("send r0");
lossy.send_bytes(b"r1").await.expect("send r1"); lossy.send_bytes(b"r2").await.expect("send r2");
let got = forwarded.lock().expect("poisoned");
assert_eq!(
&*got,
&[b"r0".to_vec(), b"r2".to_vec(), b"r1".to_vec()],
"the reordered frame must land after the following frame"
);
}
#[tokio::test]
async fn reorder_of_final_frame_is_released_by_flush() {
let forwarded = Arc::new(Mutex::new(Vec::new()));
let inner = RecordingTransport {
forwarded: forwarded.clone(),
};
let lossy = LossyTransport::reorder_sends(inner, &[1]);
lossy.send_bytes(b"r0").await.expect("send r0");
lossy.send_bytes(b"r1").await.expect("send r1");
assert_eq!(&*forwarded.lock().expect("poisoned"), &[b"r0".to_vec()]);
lossy.flush().await.expect("flush the held frame");
assert_eq!(
&*forwarded.lock().expect("poisoned"),
&[b"r0".to_vec(), b"r1".to_vec()],
"flush must release a reorder held past the final send"
);
}
#[tokio::test]
async fn delay_holds_each_send_for_at_least_the_configured_duration() {
let forwarded = Arc::new(Mutex::new(Vec::new()));
let inner = RecordingTransport {
forwarded: forwarded.clone(),
};
let control = FaultControl::with_delay(Duration::from_millis(25));
let lossy = LossyTransport::new(inner, control);
let start = tokio::time::Instant::now();
lossy.send_bytes(b"slow").await.expect("send slow");
assert!(
start.elapsed() >= Duration::from_millis(25),
"delay must hold the send for at least the configured duration"
);
assert_eq!(&*forwarded.lock().expect("poisoned"), &[b"slow".to_vec()]);
}
}