use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use vox_types::{ChannelSink, CreditSink, Metadata, Payload, Tx, TxError};
struct ImmediateSink {
send_count: Arc<AtomicUsize>,
}
impl ImmediateSink {
fn new() -> (Self, Arc<AtomicUsize>) {
let count = Arc::new(AtomicUsize::new(0));
(
Self {
send_count: count.clone(),
},
count,
)
}
}
impl ChannelSink for ImmediateSink {
fn send_payload<'a>(
&self,
payload: Payload<'a>,
) -> std::pin::Pin<Box<dyn vox_types::MaybeSendFuture<Output = Result<(), TxError>> + 'a>> {
let count = self.send_count.clone();
Box::pin(async move {
let _ = payload;
count.fetch_add(1, Ordering::SeqCst);
Ok(())
})
}
fn close_channel(
&self,
_metadata: Metadata,
) -> std::pin::Pin<Box<dyn vox_types::MaybeSendFuture<Output = Result<(), TxError>> + 'static>>
{
Box::pin(async { Ok(()) })
}
}
#[tokio::test]
async fn credit_blocks_at_zero() {
let (inner, _count) = ImmediateSink::new();
let credit_sink = Arc::new(CreditSink::new(inner, 0));
let mut tx = Tx::<String>::unbound();
tx.bind(credit_sink.clone());
let fut = tx.send("hello".to_string());
tokio::pin!(fut);
tokio::select! {
res = &mut fut => panic!("send completed with zero credit: {res:?}"),
_ = tokio::time::sleep(std::time::Duration::from_millis(50)) => {}
}
credit_sink.credit().add_permits(1);
fut.await.expect("send should succeed after credit grant");
}
#[tokio::test]
async fn credit_allows_n_sends() {
let (inner, count) = ImmediateSink::new();
let credit_sink = Arc::new(CreditSink::new(inner, 4));
let mut tx = Tx::<String>::unbound();
tx.bind(credit_sink.clone());
for i in 0..4 {
tx.send(format!("msg-{i}"))
.await
.expect("should have credit");
}
assert_eq!(count.load(Ordering::SeqCst), 4);
let fut = tx.send("blocked".to_string());
tokio::pin!(fut);
tokio::select! {
res = &mut fut => panic!("5th send should block: {res:?}"),
_ = tokio::time::sleep(std::time::Duration::from_millis(50)) => {}
}
credit_sink.credit().add_permits(2);
fut.await.expect("send should succeed after grant");
tx.send("also-ok".to_string())
.await
.expect("should have one more credit");
assert_eq!(count.load(Ordering::SeqCst), 6);
let fut = tx.send("blocked-again".to_string());
tokio::pin!(fut);
tokio::select! {
res = &mut fut => panic!("should block again: {res:?}"),
_ = tokio::time::sleep(std::time::Duration::from_millis(50)) => {}
}
}
#[tokio::test]
async fn credit_grants_are_additive() {
let (inner, count) = ImmediateSink::new();
let credit_sink = Arc::new(CreditSink::new(inner, 0));
let mut tx = Tx::<String>::unbound();
tx.bind(credit_sink.clone());
credit_sink.credit().add_permits(3);
credit_sink.credit().add_permits(5);
for i in 0..8 {
tx.send(format!("msg-{i}"))
.await
.expect("should have credit");
}
assert_eq!(count.load(Ordering::SeqCst), 8);
let fut = tx.send("blocked".to_string());
tokio::pin!(fut);
tokio::select! {
res = &mut fut => panic!("9th send should block: {res:?}"),
_ = tokio::time::sleep(std::time::Duration::from_millis(50)) => {}
}
}
#[tokio::test]
async fn credit_initial_zero_blocks_first_send() {
let (inner, _count) = ImmediateSink::new();
let credit_sink = Arc::new(CreditSink::new(inner, 0));
let mut tx = Tx::<String>::unbound();
tx.bind(credit_sink.clone());
let fut = tx.send("first".to_string());
tokio::pin!(fut);
tokio::select! {
res = &mut fut => panic!("N=0 should block first send: {res:?}"),
_ = tokio::time::sleep(std::time::Duration::from_millis(50)) => {}
}
credit_sink.credit().add_permits(1);
fut.await.expect("should succeed after explicit grant");
}