#![warn(
missing_docs,
missing_debug_implementations,
missing_copy_implementations,
trivial_casts,
trivial_numeric_casts,
unreachable_pub,
unsafe_code,
unstable_features,
unused_import_braces,
unused_qualifications
)]
mod error;
mod halt;
mod halves_stream;
mod id_gen;
mod multiplexer;
mod multiplexer_senders;
mod send_all_own;
mod sender;
mod stream_mover;
pub use error::*;
use halt::*;
pub use halves_stream::*;
pub use id_gen::*;
pub use multiplexer::*;
use multiplexer_senders::*;
use send_all_own::*;
use sender::*;
use stream_mover::*;
use std::iter::FromIterator;
type StreamId = usize;
type ChannelId = usize;
pub struct IncomingMessage<V> {
pub stream_id: StreamId,
pub value: V,
}
impl<V> std::fmt::Debug for IncomingMessage<V> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("IncomingMessage")
.field("id", &self.stream_id)
.finish()
}
}
impl<V> IncomingMessage<V> {
pub fn new(stream_id: StreamId, value: V) -> Self {
Self { stream_id, value }
}
}
#[derive(Copy, Clone, PartialEq, Debug)]
pub enum DisconnectReason {
Graceful,
ChannelChange(ChannelId),
}
pub enum IncomingPacket<V> {
StreamConnected(StreamId),
StreamDisconnected(StreamId, DisconnectReason),
Message(IncomingMessage<V>),
}
impl<V> IncomingPacket<V> {
pub fn id(&self) -> StreamId {
match self {
IncomingPacket::Message(IncomingMessage { stream_id, .. }) => *stream_id,
IncomingPacket::StreamConnected(stream_id) => *stream_id,
IncomingPacket::StreamDisconnected(stream_id, _) => *stream_id,
}
}
pub fn value(&self) -> Option<&V> {
match self {
IncomingPacket::Message(IncomingMessage { value, .. }) => Some(value),
_ => None,
}
}
}
impl<V> From<IncomingMessage<V>> for IncomingPacket<V> {
fn from(message: IncomingMessage<V>) -> Self {
Self::Message(message)
}
}
impl<V> std::fmt::Debug for IncomingPacket<V> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
IncomingPacket::StreamConnected(id) => {
write!(f, "IncomingPacket::StreamConnected({})", id)
}
IncomingPacket::StreamDisconnected(id, reason) => {
write!(f, "IncomingPacket::StreamDisonnected({}, {:?})", id, reason)
}
IncomingPacket::Message(message) => {
write!(f, "IncomingPacket::IncomingMessage({:?})", &message)
}
}
}
}
#[derive(Clone)]
pub struct OutgoingMessage<V> {
stream_ids: tinyvec::TinyVec<[Option<StreamId>; 16]>,
values: tinyvec::TinyVec<[Option<V>; 16]>,
}
impl<V> OutgoingMessage<V> {
pub fn new(
stream_ids: impl IntoIterator<Item = StreamId>,
values: impl IntoIterator<Item = V>,
) -> Self {
let stream_ids = tinyvec::TinyVec::from_iter(stream_ids.into_iter().map(Some));
let values = tinyvec::TinyVec::from_iter(values.into_iter().map(Some));
Self { stream_ids, values }
}
}
impl<V> std::fmt::Debug for OutgoingMessage<V> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OutgoingMessage")
.field("ids", &self.stream_ids)
.finish()
}
}
impl<V> Unpin for OutgoingMessage<V> where V: Unpin {}
pub enum OutgoingPacket<V> {
Message(OutgoingMessage<V>),
ChangeChannel(Vec<StreamId>, ChannelId),
Shutdown(Vec<StreamId>),
}
impl<V> std::fmt::Debug for OutgoingPacket<V> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
OutgoingPacket::Message(message) => write!(f, "OutgoingPacket::Message({:?})", message),
OutgoingPacket::ChangeChannel(ids, channel) => {
write!(f, "OutgoingPacket::ChangeChannel({:?}, {})", ids, channel)
}
OutgoingPacket::Shutdown(ids) => write!(f, "OutgoingPacket::Shutdown({:?})", ids),
}
}
}
impl<V> From<OutgoingMessage<V>> for OutgoingPacket<V> {
fn from(message: OutgoingMessage<V>) -> Self {
Self::Message(message)
}
}
#[derive(Copy, Clone, PartialEq, Debug)]
pub enum ControlMessage {
Shutdown,
}
#[cfg(test)]
mod tests {
use super::*;
use futures::prelude::*;
#[allow(dead_code)]
pub(crate) fn init_logging() {
use tracing_subscriber::FmtSubscriber;
let subscriber = FmtSubscriber::builder()
.with_max_level(tracing::Level::TRACE)
.finish();
tracing::subscriber::set_global_default(subscriber)
.expect("setting default subscriber failed");
}
pub(crate) fn sender_reader<St, Si>(sink: Si, stream: St) -> (Sender<Si>, HaltAsyncRead<St>)
where
St: Stream + Unpin,
Si: Unpin,
{
let (halt, reader) = HaltRead::wrap(stream);
let sender = Sender::new(sink, halt);
(sender, reader)
}
}