use std::collections::VecDeque;
use bytes::Bytes;
use crate::shared::{
channels::{ChannelId, MAX_CHANNEL_COUNT},
error::RecvChannelError,
peer_connection::PeerConnection,
};
pub const DEFAULT_MAX_BUFFERED_PAYLOADS_COUNT_PER_CHANNEL: usize = 512;
pub const DEFAULT_MAX_RECEIVE_CHANNEL_COUNT: usize = MAX_CHANNEL_COUNT;
pub const DEFAULT_CLEAR_STALE_RECEIVED_PAYLOADS: bool = false;
#[derive(Debug, Clone)]
pub struct RecvChannelsConfiguration {
pub max_buffered_payloads_count_per_channel: usize,
pub max_receive_channels_count: usize,
pub clear_stale_received_payloads: bool,
}
impl Default for RecvChannelsConfiguration {
fn default() -> Self {
Self {
max_buffered_payloads_count_per_channel:
DEFAULT_MAX_BUFFERED_PAYLOADS_COUNT_PER_CHANNEL,
max_receive_channels_count: DEFAULT_MAX_RECEIVE_CHANNEL_COUNT,
clear_stale_received_payloads: DEFAULT_CLEAR_STALE_RECEIVED_PAYLOADS,
}
}
}
impl<S> PeerConnection<S> {
#[inline(always)]
pub fn set_clear_stale_received_payloads(&mut self, enable: bool) {
self.recv_channels_cfg.clear_stale_received_payloads = enable;
}
pub(crate) fn internal_receive_payload(&mut self, channel_id: ChannelId) -> Option<Bytes> {
match self.recv_channels_payloads.get_mut(channel_id as usize) {
Some(payloads) => payloads.pop_front(),
None => None,
}
}
pub(crate) fn clear_stale_received_payloads(&mut self) {
if self.recv_channels_cfg.clear_stale_received_payloads {
self.clear_received_payloads();
}
}
pub fn clear_received_payloads(&mut self) {
for payloads in self.recv_channels_payloads.iter_mut() {
payloads.clear();
}
}
pub(crate) fn dispatch_received_payloads_to_channel_buffers(
&mut self,
) -> Result<(), Vec<RecvChannelError>> {
let mut errs = Vec::new();
while let Ok((channel_id, payload)) = self.dequeue_undispatched_bytes_from_peer() {
match self.recv_channels_payloads.get_mut(channel_id as usize) {
Some(payloads) => {
if payloads.len()
< self
.recv_channels_cfg
.max_buffered_payloads_count_per_channel
{
payloads.push_back(payload);
} else {
errs.push(RecvChannelError::RecvChannelFull(channel_id));
}
}
None => {
if (channel_id as usize) < self.recv_channels_cfg.max_receive_channels_count {
self.recv_channels_payloads.extend(
(self.recv_channels_payloads.len()..channel_id as usize)
.map(|_| VecDeque::new()),
);
self.recv_channels_payloads.push(VecDeque::from([payload]));
} else {
errs.push(RecvChannelError::MaxRecvChannelCountReached(channel_id));
}
}
}
}
match errs.is_empty() {
true => Ok(()),
false => Err(errs),
}
}
}