#![cfg(not(loom))]
use std::{future::Future, pin::Pin};
use futures::FutureExt;
use rstest::{fixture, rstest};
use serial_test::serial;
use tokio::time::{self, Duration};
use tokio_util::sync::CancellationToken;
use wireframe::{
connection::{ConnectionActor, FairnessConfig},
push::{PushError, PushHandle, PushPriority, PushQueues},
};
use wireframe_testing::{TestResult, push_expect};
#[path = "common/interleaved_push_helpers.rs"]
mod interleaved_push_helpers;
#[rustfmt::skip]
#[fixture]
fn shutdown_token() -> CancellationToken {
CancellationToken::new()
}
async fn run_actor_test<S, SFut, A>(fairness: FairnessConfig, setup: S, assertions: A) -> TestResult
where
S: FnOnce(PushHandle<u8>) -> SFut,
SFut: Future<Output = TestResult>,
A: FnOnce(Vec<u8>),
{
let out = interleaved_push_helpers::run_actor_with_fairness(fairness, setup).await?;
assertions(out);
Ok(())
}
async fn drain_three(queues: &mut PushQueues<u8>) -> TestResult<(u8, u8, u8)> {
let (_, a) = queues.recv().await.ok_or("recv 1 failed")?;
let (_, b) = queues.recv().await.ok_or("recv 2 failed")?;
let (_, c) = queues.recv().await.ok_or("recv 3 failed")?;
Ok((a, b, c))
}
type PushFn = fn(&PushHandle<u8>, u8) -> Pin<Box<dyn Future<Output = Result<(), PushError>> + '_>>;
#[rstest]
#[case::high(PushPriority::High)]
#[case::low(PushPriority::Low)]
#[tokio::test]
#[serial]
async fn rate_limit_symmetric_single_priority(#[case] priority: PushPriority) -> TestResult {
let push_fn: PushFn = match priority {
PushPriority::High => |h, v| Box::pin(h.push_high_priority(v)),
PushPriority::Low => |h, v| Box::pin(h.push_low_priority(v)),
};
time::pause();
let (mut queues, handle) = PushQueues::<u8>::builder()
.high_capacity(4)
.low_capacity(4)
.rate(Some(2))
.build()?;
push_fn(&handle, 1).await?;
push_fn(&handle, 2).await?;
let mut blocked = push_fn(&handle, 3);
tokio::task::yield_now().await;
assert!(
blocked.as_mut().now_or_never().is_none(),
"third {priority:?} push should be pending under rate limit"
);
time::advance(Duration::from_secs(1)).await;
push_fn(&handle, 4).await?;
let frames = drain_three(&mut queues).await?;
assert_eq!(frames, (1, 2, 4), "{priority:?} frames delivered in order");
Ok(())
}
#[rstest]
#[tokio::test]
#[serial]
async fn rate_limit_symmetric_mixed() -> TestResult {
time::pause();
let (mut queues, handle) = PushQueues::<u8>::builder()
.high_capacity(4)
.low_capacity(4)
.rate(Some(1))
.build()?;
push_expect!(handle.push_high_priority(1))?;
let mut blocked = handle.push_low_priority(2).boxed();
tokio::task::yield_now().await;
assert!(
blocked.as_mut().now_or_never().is_none(),
"low push should be pending: high already consumed the token"
);
time::advance(Duration::from_secs(1)).await;
push_expect!(handle.push_low_priority(3))?;
let (_, a) = queues.recv().await.ok_or("recv 1 failed")?;
let (_, b) = queues.recv().await.ok_or("recv 2 failed")?;
assert_eq!(a, 1, "first frame should be the high push");
assert_eq!(b, 3, "second frame should be the low push after refill");
Ok(())
}
#[rstest]
#[tokio::test]
#[serial]
async fn interleaved_fairness_yields_at_threshold() -> TestResult {
run_actor_test(
FairnessConfig {
max_high_before_low: 3,
time_slice: None,
},
|handle| async move {
for n in 1..=6 {
push_expect!(handle.push_high_priority(n))?;
}
push_expect!(handle.push_low_priority(101))?;
push_expect!(handle.push_low_priority(102))?;
Ok(())
},
|out| {
assert_eq!(
out,
vec![1, 2, 3, 101, 4, 5, 6, 102],
"low-priority frames should be interleaved every 3 high frames"
);
},
)
.await
}
#[rstest]
#[tokio::test]
#[serial]
async fn interleaved_all_frames_delivered() -> TestResult {
run_actor_test(
FairnessConfig {
max_high_before_low: 2,
time_slice: None,
},
|handle| async move {
for n in 1..=5 {
push_expect!(handle.push_high_priority(n))?;
}
for n in 101..=105 {
push_expect!(handle.push_low_priority(n))?;
}
Ok(())
},
|out| {
assert_eq!(out.len(), 10, "all 10 frames should be delivered");
let mut high_frames: Vec<u8> = out.iter().copied().filter(|&f| f <= 5).collect();
let mut low_frames: Vec<u8> = out.iter().copied().filter(|&f| f >= 101).collect();
high_frames.sort_unstable();
low_frames.sort_unstable();
assert_eq!(high_frames, vec![1, 2, 3, 4, 5], "all high frames present");
assert_eq!(
low_frames,
vec![101, 102, 103, 104, 105],
"all low frames present"
);
},
)
.await
}
#[rstest]
#[tokio::test]
#[serial]
async fn interleaved_time_slice_fairness(shutdown_token: CancellationToken) -> TestResult {
time::pause();
let (queues, handle) = PushQueues::<u8>::builder()
.high_capacity(8)
.low_capacity(8)
.unlimited()
.build()?;
let fairness = FairnessConfig {
max_high_before_low: 0,
time_slice: Some(Duration::from_millis(10)),
};
let mut actor: ConnectionActor<_, ()> =
ConnectionActor::new(queues, handle.clone(), None, shutdown_token);
actor.set_fairness(fairness);
let task = tokio::spawn(async move {
let mut out = Vec::new();
actor
.run(&mut out)
.await
.map_err(|e| format!("actor run failed: {e:?}"))?;
Ok::<_, String>(out)
});
push_expect!(handle.push_high_priority(1))?;
time::advance(Duration::from_millis(5)).await;
push_expect!(handle.push_high_priority(2))?;
time::advance(Duration::from_millis(15)).await;
push_expect!(handle.push_low_priority(42))?;
for n in 3..=5 {
push_expect!(handle.push_high_priority(n))?;
}
drop(handle);
let out = task
.await
.map_err(|e| format!("actor task panicked: {e}"))??;
assert!(out.contains(&42), "low-priority item should be delivered");
let pos = out
.iter()
.position(|x| *x == 42)
.ok_or("value 42 should be present")?;
assert!(
pos > 0 && pos < out.len() - 1,
"low-priority item should be yielded mid-stream: pos={pos}, out={out:?}"
);
Ok(())
}
#[rstest]
#[tokio::test]
#[serial]
async fn rate_limit_interleaved_total_throughput() -> TestResult {
time::pause();
let (mut queues, handle) = PushQueues::<u8>::builder()
.high_capacity(8)
.low_capacity(8)
.rate(Some(4))
.build()?;
push_expect!(handle.push_high_priority(1))?;
push_expect!(handle.push_low_priority(2))?;
push_expect!(handle.push_high_priority(3))?;
push_expect!(handle.push_low_priority(4))?;
let mut blocked = handle.push_high_priority(5).boxed();
tokio::task::yield_now().await;
assert!(
blocked.as_mut().now_or_never().is_none(),
"5th push should be pending: all 4 tokens consumed"
);
time::advance(Duration::from_secs(1)).await;
push_expect!(handle.push_high_priority(6))?;
push_expect!(handle.push_low_priority(7))?;
let mut out = Vec::new();
for _ in 0..6 {
let (_, frame) = queues.recv().await.ok_or("recv failed")?;
out.push(frame);
}
assert_eq!(out.len(), 6, "all 6 accepted frames should be delivered");
Ok(())
}
#[rstest]
#[tokio::test]
#[serial]
async fn fairness_disabled_strict_priority() -> TestResult {
run_actor_test(
FairnessConfig {
max_high_before_low: 0,
time_slice: None,
},
|handle| async move {
push_expect!(handle.push_low_priority(101))?;
push_expect!(handle.push_low_priority(102))?;
push_expect!(handle.push_high_priority(1))?;
push_expect!(handle.push_high_priority(2))?;
push_expect!(handle.push_high_priority(3))?;
Ok(())
},
|out| {
assert_eq!(
out,
vec![1, 2, 3, 101, 102],
"all high frames should precede all low frames"
);
},
)
.await
}