use std::cmp::min;
use std::time::{Duration, Instant};
#[derive(Debug)]
pub enum OutStreamError {
ChunkPreviouslyReceivedMarkedAsNotReceived,
IndexOutOfBounds,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct BlobStreamOutEntry {
pub last_sent_at: Option<Instant>,
pub index: usize,
pub is_received_by_remote: bool,
}
impl BlobStreamOutEntry {
#[must_use]
pub fn new(index: usize) -> Self {
Self {
last_sent_at: None,
index,
is_received_by_remote: false,
}
}
pub fn sent_at_time(&mut self, time: Instant) {
self.last_sent_at = Some(time);
}
}
#[allow(unused)]
#[derive(Debug)]
pub struct BlobStreamOut {
pub(crate) entries: Vec<BlobStreamOutEntry>,
start_index_to_send: usize,
index_to_start_from_if_not_filled_up: usize,
resend_duration: Duration,
chunk_count_received_by_remote: usize,
}
impl BlobStreamOut {
#[must_use]
pub fn new(chunk_count: usize, resend_duration: Duration) -> Self {
assert_ne!(chunk_count, 0, "chunk_count cannot be zero");
let entries: Vec<BlobStreamOutEntry> =
(0..chunk_count).map(BlobStreamOutEntry::new).collect();
Self {
entries,
resend_duration,
index_to_start_from_if_not_filled_up: 0,
start_index_to_send: 0,
chunk_count_received_by_remote: 0,
}
}
pub fn chunk_count(&self) -> usize {
self.entries.len()
}
pub fn set_waiting_for_chunk_index(
&mut self,
index: usize,
receive_mask: u64,
) -> Result<(), OutStreamError> {
self.start_index_to_send = index;
if index > self.start_index_to_send {
return Err(OutStreamError::IndexOutOfBounds);
}
let start = index + 1;
let end = min(self.entries.len(), start + 64);
for previously_received_entry in self.entries[0..index].iter_mut() {
if !previously_received_entry.is_received_by_remote {
previously_received_entry.is_received_by_remote = true;
self.chunk_count_received_by_remote += 1;
}
}
if index < self.entries.len() {
let waiting_for_entry = self
.entries
.get_mut(index)
.expect("entry index should been validated earlier");
if waiting_for_entry.is_received_by_remote {
return Err(OutStreamError::ChunkPreviouslyReceivedMarkedAsNotReceived);
}
waiting_for_entry.last_sent_at = None;
}
let mut mask = receive_mask;
for i in index + 1..end {
let entry = self
.entries
.get_mut(i)
.expect("entry index should been validated earlier");
if mask & 0b1 != 0 {
if !entry.is_received_by_remote {
entry.is_received_by_remote = true;
self.chunk_count_received_by_remote += 1;
}
} else {
if entry.is_received_by_remote {
return Err(OutStreamError::ChunkPreviouslyReceivedMarkedAsNotReceived);
}
entry.last_sent_at = None;
}
mask >>= 1;
}
Ok(())
}
pub fn send(&mut self, now: Instant, max_count: usize) -> Vec<usize> {
let mut filtered_out_indices: Vec<usize> = self
.entries
.iter()
.skip(self.start_index_to_send)
.take(max_count) .filter(|entry| {
!entry.is_received_by_remote
&& entry
.last_sent_at
.map_or(true, |t| now.duration_since(t) >= self.resend_duration)
})
.map(|entry| entry.index)
.collect();
if filtered_out_indices.len() < max_count {
let lower_index = self.start_index_to_send + max_count;
let expected_remaining = max_count - filtered_out_indices.len();
if self.index_to_start_from_if_not_filled_up + expected_remaining > self.entries.len() {
self.index_to_start_from_if_not_filled_up = lower_index;
}
if self.index_to_start_from_if_not_filled_up < lower_index {
self.index_to_start_from_if_not_filled_up = lower_index;
}
let additional_indicies: Vec<usize> = self
.entries
.iter()
.skip(self.index_to_start_from_if_not_filled_up) .filter(|entry| {
!entry.is_received_by_remote
&& !filtered_out_indices.iter().any(|e| *e == entry.index)
})
.map(|entry| entry.index)
.take(expected_remaining) .collect();
self.index_to_start_from_if_not_filled_up += additional_indicies.len();
filtered_out_indices.extend(additional_indicies);
}
for entry_index in filtered_out_indices.iter() {
let ent = self
.entries
.get_mut(*entry_index)
.expect("should always be there");
ent.sent_at_time(now);
}
filtered_out_indices
}
pub fn is_received_by_remote(&self) -> bool {
self.chunk_count_received_by_remote == self.entries.len()
}
}