use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::{AtomicUsize, Ordering};
use vox_types::{
ChannelDebugContext, ChannelEvent, ChannelId, ChannelSink, ChannelTrySendOutcome, ConnectionId,
CreditSink, Metadata, Payload, TrySendError, Tx, TxError, VoxObserver, VoxObserverHandle,
};
struct ImmediateSink {
send_count: Arc<AtomicUsize>,
}
impl ImmediateSink {
fn new() -> (Self, Arc<AtomicUsize>) {
let count = Arc::new(AtomicUsize::new(0));
(
Self {
send_count: count.clone(),
},
count,
)
}
}
impl ChannelSink for ImmediateSink {
fn send_payload<'a>(
&self,
payload: Payload<'a>,
) -> std::pin::Pin<Box<dyn vox_types::MaybeSendFuture<Output = Result<(), TxError>> + 'a>> {
let count = self.send_count.clone();
Box::pin(async move {
let _ = payload;
count.fetch_add(1, Ordering::SeqCst);
Ok(())
})
}
fn try_send_payload<'a>(&self, payload: Payload<'a>) -> Result<(), TrySendError<()>> {
let _ = payload;
self.send_count.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn close_channel(
&self,
_metadata: Metadata,
) -> std::pin::Pin<Box<dyn vox_types::MaybeSendFuture<Output = Result<(), TxError>> + 'static>>
{
Box::pin(async { Ok(()) })
}
}
struct RecordingObserver {
events: Arc<Mutex<Vec<ChannelEvent>>>,
}
impl VoxObserver for RecordingObserver {
fn channel_event(&self, event: ChannelEvent) {
self.events.lock().unwrap().push(event);
}
}
struct ObservedTrySendSink {
connection_id: ConnectionId,
channel_id: ChannelId,
debug_context: ChannelDebugContext,
observer: VoxObserverHandle,
outcome: Option<ChannelTrySendOutcome>,
}
impl ChannelSink for ObservedTrySendSink {
fn send_payload<'a>(
&self,
_payload: Payload<'a>,
) -> std::pin::Pin<Box<dyn vox_types::MaybeSendFuture<Output = Result<(), TxError>> + 'a>> {
Box::pin(async { Ok(()) })
}
fn channel_id(&self) -> Option<ChannelId> {
Some(self.channel_id)
}
fn connection_id(&self) -> Option<ConnectionId> {
Some(self.connection_id)
}
fn debug_context(&self) -> Option<ChannelDebugContext> {
Some(self.debug_context)
}
fn observer(&self) -> Option<VoxObserverHandle> {
Some(self.observer.clone())
}
fn try_send_payload_with_outcome<'a>(
&self,
_payload: Payload<'a>,
) -> Result<(), ChannelTrySendOutcome> {
match self.outcome {
Some(outcome) => Err(outcome),
None => Ok(()),
}
}
fn close_channel(
&self,
_metadata: Metadata,
) -> std::pin::Pin<Box<dyn vox_types::MaybeSendFuture<Output = Result<(), TxError>> + 'static>>
{
Box::pin(async { Ok(()) })
}
}
#[test]
fn try_send_returns_full_with_value_when_credit_is_exhausted() {
let (inner, count) = ImmediateSink::new();
let credit_sink = Arc::new(CreditSink::new(inner, 1));
let mut tx = Tx::<String>::unbound();
tx.bind(credit_sink);
tx.try_send("first".to_string())
.expect("first send should use initial credit");
assert_eq!(count.load(Ordering::SeqCst), 1);
match tx.try_send("second".to_string()) {
Err(TrySendError::Full(value)) => assert_eq!(value, "second"),
other => panic!("expected Full(second), got {other:?}"),
}
assert_eq!(count.load(Ordering::SeqCst), 1);
}
#[test]
fn try_send_returns_closed_with_value_when_credit_is_closed() {
let (inner, count) = ImmediateSink::new();
let credit_sink = Arc::new(CreditSink::new(inner, 0));
credit_sink.credit().close();
let mut tx = Tx::<String>::unbound();
tx.bind(credit_sink);
match tx.try_send("closed".to_string()) {
Err(TrySendError::Closed(value)) => assert_eq!(value, "closed"),
other => panic!("expected Closed(closed), got {other:?}"),
}
assert_eq!(count.load(Ordering::SeqCst), 0);
}
#[test]
fn observer_distinguishes_try_send_full_credit_from_runtime_queue() {
let events = Arc::new(Mutex::new(Vec::new()));
let observer: VoxObserverHandle = Arc::new(RecordingObserver {
events: events.clone(),
});
let mut credit_full_tx = Tx::<u32>::unbound();
credit_full_tx.bind(Arc::new(CreditSink::new(
ObservedTrySendSink {
connection_id: ConnectionId(5),
channel_id: ChannelId(7),
debug_context: ChannelDebugContext {
label: Some("credit-full"),
..ChannelDebugContext::default()
},
observer: observer.clone(),
outcome: None,
},
0,
)));
assert!(matches!(
credit_full_tx.try_send(1),
Err(TrySendError::Full(1))
));
let mut runtime_full_tx = Tx::<u32>::unbound();
runtime_full_tx.bind(Arc::new(CreditSink::new(
ObservedTrySendSink {
connection_id: ConnectionId(5),
channel_id: ChannelId(9),
debug_context: ChannelDebugContext {
label: Some("runtime-full"),
..ChannelDebugContext::default()
},
observer,
outcome: Some(ChannelTrySendOutcome::FullRuntimeQueue),
},
1,
)));
assert!(matches!(
runtime_full_tx.try_send(2),
Err(TrySendError::Full(2))
));
let events = events.lock().unwrap();
assert!(events.iter().any(|event| matches!(
event,
ChannelEvent::TrySend { channel, outcome: ChannelTrySendOutcome::FullCredit }
if channel.connection_id == Some(ConnectionId(5))
&& channel.channel_id == ChannelId(7)
&& channel.debug.and_then(|debug| debug.label) == Some("credit-full")
)));
assert!(events.iter().any(|event| matches!(
event,
ChannelEvent::TrySend { channel, outcome: ChannelTrySendOutcome::FullRuntimeQueue }
if channel.connection_id == Some(ConnectionId(5))
&& channel.channel_id == ChannelId(9)
&& channel.debug.and_then(|debug| debug.label) == Some("runtime-full")
)));
}
#[tokio::test]
async fn credit_blocks_at_zero() {
let (inner, _count) = ImmediateSink::new();
let credit_sink = Arc::new(CreditSink::new(inner, 0));
let mut tx = Tx::<String>::unbound();
tx.bind(credit_sink.clone());
let fut = tx.send("hello".to_string());
tokio::pin!(fut);
tokio::select! {
res = &mut fut => panic!("send completed with zero credit: {res:?}"),
_ = tokio::time::sleep(std::time::Duration::from_millis(50)) => {}
}
credit_sink.credit().add_permits(1);
fut.await.expect("send should succeed after credit grant");
}
#[tokio::test]
async fn credit_allows_n_sends() {
let (inner, count) = ImmediateSink::new();
let credit_sink = Arc::new(CreditSink::new(inner, 4));
let mut tx = Tx::<String>::unbound();
tx.bind(credit_sink.clone());
for i in 0..4 {
tx.send(format!("msg-{i}"))
.await
.expect("should have credit");
}
assert_eq!(count.load(Ordering::SeqCst), 4);
let fut = tx.send("blocked".to_string());
tokio::pin!(fut);
tokio::select! {
res = &mut fut => panic!("5th send should block: {res:?}"),
_ = tokio::time::sleep(std::time::Duration::from_millis(50)) => {}
}
credit_sink.credit().add_permits(2);
fut.await.expect("send should succeed after grant");
tx.send("also-ok".to_string())
.await
.expect("should have one more credit");
assert_eq!(count.load(Ordering::SeqCst), 6);
let fut = tx.send("blocked-again".to_string());
tokio::pin!(fut);
tokio::select! {
res = &mut fut => panic!("should block again: {res:?}"),
_ = tokio::time::sleep(std::time::Duration::from_millis(50)) => {}
}
}
#[tokio::test]
async fn credit_grants_are_additive() {
let (inner, count) = ImmediateSink::new();
let credit_sink = Arc::new(CreditSink::new(inner, 0));
let mut tx = Tx::<String>::unbound();
tx.bind(credit_sink.clone());
credit_sink.credit().add_permits(3);
credit_sink.credit().add_permits(5);
for i in 0..8 {
tx.send(format!("msg-{i}"))
.await
.expect("should have credit");
}
assert_eq!(count.load(Ordering::SeqCst), 8);
let fut = tx.send("blocked".to_string());
tokio::pin!(fut);
tokio::select! {
res = &mut fut => panic!("9th send should block: {res:?}"),
_ = tokio::time::sleep(std::time::Duration::from_millis(50)) => {}
}
}
#[tokio::test]
async fn credit_initial_zero_blocks_first_send() {
let (inner, _count) = ImmediateSink::new();
let credit_sink = Arc::new(CreditSink::new(inner, 0));
let mut tx = Tx::<String>::unbound();
tx.bind(credit_sink.clone());
let fut = tx.send("first".to_string());
tokio::pin!(fut);
tokio::select! {
res = &mut fut => panic!("N=0 should block first send: {res:?}"),
_ = tokio::time::sleep(std::time::Duration::from_millis(50)) => {}
}
credit_sink.credit().add_permits(1);
fut.await.expect("should succeed after explicit grant");
}