use bytes::Bytes;
use std::fmt::Debug;
use tokio::sync::{
broadcast,
mpsc::{self, error::TrySendError},
};
mod reliable;
pub(crate) mod tasks;
mod unreliable;
pub use reliable::DEFAULT_MAX_RELIABLE_FRAME_LEN;
use super::error::{AsyncChannelError, ChannelCloseError, ChannelConfigError};
pub type ChannelId = u8;
pub const MAX_CHANNEL_COUNT: usize = u8::MAX as usize + 1;
pub(crate) const CHANNEL_ID_LEN: usize = size_of::<ChannelId>();
pub(crate) const PROTOCOL_HEADER_LEN: usize = CHANNEL_ID_LEN;
pub(crate) type CloseSend = broadcast::Sender<CloseReason>;
pub(crate) type CloseRecv = broadcast::Receiver<CloseReason>;
#[derive(PartialEq, Clone, Copy)]
pub(crate) enum CloseReason {
LocalOrder,
PeerClosed,
}
#[derive(Debug, Copy, Clone)]
pub enum ChannelConfig {
OrderedReliable {
max_frame_size: usize,
},
UnorderedReliable {
max_frame_size: usize,
},
Unreliable,
}
impl Default for ChannelConfig {
fn default() -> Self {
ChannelConfig::default_ordered_reliable()
}
}
impl ChannelConfig {
pub const fn default_unreliable() -> Self {
ChannelConfig::Unreliable
}
pub const fn default_unordered_reliable() -> Self {
ChannelConfig::UnorderedReliable {
max_frame_size: DEFAULT_MAX_RELIABLE_FRAME_LEN,
}
}
pub const fn default_ordered_reliable() -> Self {
ChannelConfig::OrderedReliable {
max_frame_size: DEFAULT_MAX_RELIABLE_FRAME_LEN,
}
}
}
#[derive(Debug)]
pub(crate) enum ChannelAsyncMessage {
LostConnection,
}
#[derive(Debug)]
pub(crate) enum ChannelSyncMessage {
CreateChannel {
id: ChannelId,
config: ChannelConfig,
bytes_to_channel_recv: mpsc::Receiver<Bytes>,
channel_close_recv: mpsc::Receiver<()>,
},
}
#[derive(Debug)]
pub(crate) struct Channel {
id: ChannelId,
sender: mpsc::Sender<Bytes>,
close_sender: mpsc::Sender<()>,
}
impl Channel {
pub(crate) fn new(
id: ChannelId,
sender: mpsc::Sender<Bytes>,
close_sender: mpsc::Sender<()>,
) -> Self {
Self {
id,
sender,
close_sender,
}
}
pub fn id(&self) -> ChannelId {
self.id
}
pub(crate) fn send_payload(&self, payload: Bytes) -> Result<(), AsyncChannelError> {
match self.sender.try_send(payload) {
Ok(_) => Ok(()),
Err(err) => match err {
TrySendError::Full(_) => Err(AsyncChannelError::FullQueue),
TrySendError::Closed(_) => Err(AsyncChannelError::InternalChannelClosed),
},
}
}
pub(crate) fn close(&self) -> Result<(), ChannelCloseError> {
match self.close_sender.blocking_send(()) {
Ok(_) => Ok(()),
Err(_) => {
Err(ChannelCloseError::ChannelAlreadyClosed)
}
}
}
}
#[derive(Debug, Clone)]
pub struct SendChannelsConfiguration {
channels: Vec<ChannelConfig>,
}
impl Default for SendChannelsConfiguration {
fn default() -> Self {
Self {
channels: vec![ChannelConfig::OrderedReliable {
max_frame_size: DEFAULT_MAX_RELIABLE_FRAME_LEN,
}],
}
}
}
impl SendChannelsConfiguration {
pub fn new() -> Self {
Self {
channels: Vec::new(),
}
}
pub fn from_configs(
channel_types: Vec<ChannelConfig>,
) -> Result<SendChannelsConfiguration, ChannelConfigError> {
if channel_types.len() > MAX_CHANNEL_COUNT {
Err(ChannelConfigError::MaxChannelsCountReached)
} else {
Ok(Self {
channels: channel_types,
})
}
}
pub fn add(&mut self, channel_type: ChannelConfig) -> Option<ChannelId> {
if self.channels.len() < MAX_CHANNEL_COUNT {
self.channels.push(channel_type);
Some((self.channels.len() - 1) as u8)
} else {
None
}
}
pub(crate) fn configs(&self) -> &Vec<ChannelConfig> {
&self.channels
}
}