pub mod aggregator;
pub mod error;
#[cfg(any(test, feature = "test-internals"))]
pub mod half_close_conformance_tests;
#[cfg(any(test, feature = "test-internals"))]
pub mod mock;
pub mod router;
pub mod sink;
pub mod stream;
mod tests;
pub use aggregator::{
AggregatorConfig, AggregatorStats, DeduplicatorConfig, DeduplicatorStats,
ExperimentalTransportDowngradeReason, ExperimentalTransportGate, MultipathAggregator,
PathCharacteristics, PathId, PathSelectionDecision, PathSelectionDowngradeReason,
PathSelectionPolicy, PathSet, PathSetStats, PathState, ProcessResult, ReordererConfig,
ReordererStats, SymbolDeduplicator, SymbolReorderer, TransportCodingPolicy,
TransportExperimentContext, TransportExperimentDecision, TransportPath,
};
pub use error::{SinkError, StreamError};
#[cfg(any(test, feature = "test-internals"))]
pub use mock::{
NodeId, SimChannelSink, SimChannelStream, SimLink, SimNetwork, SimSymbolSink, SimSymbolStream,
SimTransportConfig, sim_channel,
};
pub use router::{
DispatchConfig, DispatchError, DispatchResult, DispatchStrategy, Endpoint, EndpointId,
EndpointState, LoadBalanceStrategy, LoadBalancer, RouteKey, RouteResult, RoutingEntry,
RoutingError, RoutingTable, SymbolDispatcher, SymbolRouter,
};
pub use sink::{SymbolSink, SymbolSinkExt};
pub use stream::{SymbolStream, SymbolStreamExt};
use crate::security::authenticated::AuthenticatedSymbol;
use crate::types::Symbol;
use parking_lot::Mutex;
use smallvec::SmallVec;
use std::collections::{HashSet, VecDeque};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::task::Waker;
pub type SymbolSet = HashSet<Symbol>;
#[derive(Debug)]
pub(crate) struct ChannelWaiter {
pub waker: Waker,
pub queued: Arc<AtomicBool>,
}
#[derive(Debug)]
pub(crate) struct SharedChannel {
pub queue: Mutex<VecDeque<AuthenticatedSymbol>>,
pub capacity: usize,
pub send_wakers: Mutex<SmallVec<[ChannelWaiter; 2]>>,
pub recv_wakers: Mutex<SmallVec<[ChannelWaiter; 2]>>,
pub closed: AtomicBool,
}
impl SharedChannel {
pub(crate) fn new(capacity: usize) -> Self {
assert!(capacity > 0, "transport::channel capacity must be > 0");
Self {
queue: Mutex::new(VecDeque::new()),
capacity,
send_wakers: Mutex::new(SmallVec::new()),
recv_wakers: Mutex::new(SmallVec::new()),
closed: AtomicBool::new(false),
}
}
pub(crate) fn close(&self) {
self.closed.store(true, Ordering::Release);
let send_wakers = {
let mut wakers = self.send_wakers.lock();
std::mem::take(&mut *wakers)
};
let recv_wakers = {
let mut wakers = self.recv_wakers.lock();
std::mem::take(&mut *wakers)
};
for w in send_wakers {
w.queued.store(false, Ordering::Release);
w.waker.wake();
}
for w in recv_wakers {
w.queued.store(false, Ordering::Release);
w.waker.wake();
}
}
}
#[must_use]
pub fn channel(capacity: usize) -> (sink::ChannelSink, stream::ChannelStream) {
let shared = Arc::new(SharedChannel::new(capacity));
(
sink::ChannelSink::new(shared.clone()),
stream::ChannelStream::new(shared),
)
}
#[cfg(test)]
mod inline_tests {
use super::*;
use crate::security::authenticated::AuthenticatedSymbol;
use crate::security::tag::AuthenticationTag;
use crate::transport::{SymbolSinkExt, SymbolStreamExt};
use crate::types::{Symbol, SymbolId, SymbolKind};
use futures_lite::future;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::task::{Wake, Waker};
fn init_test(name: &str) {
crate::test_utils::init_test_logging();
crate::test_phase!(name);
}
fn create_symbol(esi: u32) -> AuthenticatedSymbol {
let id = SymbolId::new_for_test(1, 0, esi);
let symbol = Symbol::new(id, vec![esi as u8], SymbolKind::Source);
let tag = AuthenticationTag::zero();
AuthenticatedSymbol::new_verified(symbol, tag)
}
struct FlagWake {
flag: Arc<AtomicBool>,
}
impl Wake for FlagWake {
fn wake(self: Arc<Self>) {
self.flag.store(true, Ordering::SeqCst);
}
}
fn flagged_waker(flag: Arc<AtomicBool>) -> Waker {
Waker::from(Arc::new(FlagWake { flag }))
}
#[test]
fn test_channel_round_trip_and_close() {
init_test("test_channel_round_trip_and_close");
let (mut sink, mut stream) = channel(2);
let s1 = create_symbol(1);
let s2 = create_symbol(2);
future::block_on(async {
sink.send(s1.clone()).await.unwrap();
sink.send(s2.clone()).await.unwrap();
sink.close().await.unwrap();
let r1 = stream.next().await.unwrap().unwrap();
let r2 = stream.next().await.unwrap().unwrap();
crate::assert_with_log!(r1 == s1, "first symbol", true, r1 == s1);
crate::assert_with_log!(r2 == s2, "second symbol", true, r2 == s2);
crate::assert_with_log!(stream.next().await.is_none(), "stream closed", true, true);
});
crate::test_complete!("test_channel_round_trip_and_close");
}
#[test]
fn test_shared_channel_close_wakes_waiters() {
init_test("test_shared_channel_close_wakes_waiters");
let shared = SharedChannel::new(1);
let send_flag = Arc::new(AtomicBool::new(false));
let recv_flag = Arc::new(AtomicBool::new(false));
let send_queued = Arc::new(AtomicBool::new(true));
let recv_queued = Arc::new(AtomicBool::new(true));
{
let mut send_wakers = shared.send_wakers.lock();
send_wakers.push(ChannelWaiter {
waker: flagged_waker(Arc::clone(&send_flag)),
queued: Arc::clone(&send_queued),
});
}
{
let mut recv_wakers = shared.recv_wakers.lock();
recv_wakers.push(ChannelWaiter {
waker: flagged_waker(Arc::clone(&recv_flag)),
queued: Arc::clone(&recv_queued),
});
}
shared.close();
crate::assert_with_log!(
shared.closed.load(Ordering::SeqCst),
"closed flag set",
true,
shared.closed.load(Ordering::SeqCst)
);
crate::assert_with_log!(
!send_queued.load(Ordering::Acquire),
"send queued cleared",
false,
send_queued.load(Ordering::Acquire)
);
crate::assert_with_log!(
!recv_queued.load(Ordering::Acquire),
"recv queued cleared",
false,
recv_queued.load(Ordering::Acquire)
);
crate::assert_with_log!(
send_flag.load(Ordering::Acquire),
"send waker fired",
true,
send_flag.load(Ordering::Acquire)
);
crate::assert_with_log!(
recv_flag.load(Ordering::Acquire),
"recv waker fired",
true,
recv_flag.load(Ordering::Acquire)
);
crate::test_complete!("test_shared_channel_close_wakes_waiters");
}
#[test]
#[should_panic(expected = "transport::channel capacity must be > 0")]
fn test_channel_zero_capacity_panics() {
let _ = channel(0);
}
}