nimble_blob_stream/
out_stream.rs1use monotonic_time_rs::Millis;
6use std::cmp::min;
7use std::num::TryFromIntError;
8use std::time::Duration;
9
10#[derive(Debug)]
11#[allow(clippy::module_name_repetitions)]
12pub enum OutStreamError {
13 ChunkPreviouslyReceivedMarkedAsNotReceived,
14 IndexOutOfBounds,
15 BlobIsTooLarge(TryFromIntError),
16 UnexpectedStartTransfer,
17 FixedChunkSizeIsTooLarge,
18}
19
20#[derive(Clone, Debug, Eq, PartialEq)]
26pub struct BlobStreamOutEntry {
27 pub last_sent_at: Option<Millis>,
28 pub index: u32,
29 pub is_received_by_remote: bool,
30}
31
32impl BlobStreamOutEntry {
33 #[must_use]
43 pub const fn new(index: u32) -> Self {
44 Self {
45 last_sent_at: None,
46 index,
47 is_received_by_remote: false,
48 }
49 }
50
51 pub fn sent_at_time(&mut self, time: Millis) {
57 self.last_sent_at = Some(time);
58 }
59}
60
61#[allow(unused)]
67#[derive(Debug)]
68pub struct BlobStreamOut {
69 pub(crate) entries: Vec<BlobStreamOutEntry>,
70 start_index_to_send: usize,
71 index_to_start_from_if_not_filled_up: usize,
72 resend_duration: Duration,
73 chunk_count_received_by_remote: usize,
74}
75
76impl BlobStreamOut {
77 #[must_use]
93 pub fn new(chunk_count: u32, resend_duration: Duration) -> Self {
94 assert_ne!(chunk_count, 0, "chunk_count cannot be zero");
95
96 let entries: Vec<BlobStreamOutEntry> =
98 (0u32..chunk_count).map(BlobStreamOutEntry::new).collect();
99
100 Self {
101 entries,
102 resend_duration,
103 index_to_start_from_if_not_filled_up: 0,
104 start_index_to_send: 0,
105 chunk_count_received_by_remote: 0,
106 }
107 }
108
109 #[must_use]
110 #[allow(clippy::cast_possible_truncation)]
111 pub fn chunk_count(&self) -> u32 {
112 self.entries.len() as u32
113 }
114
115 #[allow(clippy::missing_panics_doc)]
124 pub fn set_waiting_for_chunk_index(
125 &mut self,
126 index: usize,
127 receive_mask: u64,
128 ) -> Result<(), OutStreamError> {
129 self.start_index_to_send = index;
130
131 if index > self.start_index_to_send {
132 return Err(OutStreamError::IndexOutOfBounds);
133 }
134 let start = index + 1;
135 let end = min(self.entries.len(), start + 64);
136
137 for previously_received_entry in &mut self.entries[0..index] {
138 if !previously_received_entry.is_received_by_remote {
139 previously_received_entry.is_received_by_remote = true;
140 self.chunk_count_received_by_remote += 1;
141 }
142 }
143
144 if index < self.entries.len() {
145 let waiting_for_entry = self
146 .entries
147 .get_mut(index)
148 .expect("entry index should been validated earlier");
149 if waiting_for_entry.is_received_by_remote {
151 return Err(OutStreamError::ChunkPreviouslyReceivedMarkedAsNotReceived);
152 }
153 waiting_for_entry.last_sent_at = None;
154 }
155
156 let mut mask = receive_mask;
157 for i in index + 1..end {
158 #[allow(clippy::missing_panics_doc)]
159 let entry = self
160 .entries
161 .get_mut(i)
162 .expect("entry index should been validated earlier");
163 if mask & 0b1 != 0 {
164 if !entry.is_received_by_remote {
165 entry.is_received_by_remote = true;
166 self.chunk_count_received_by_remote += 1;
167 }
168 } else {
169 if entry.is_received_by_remote {
171 return Err(OutStreamError::ChunkPreviouslyReceivedMarkedAsNotReceived);
172 }
173 entry.last_sent_at = None;
174 }
175 mask >>= 1;
176 }
177
178 Ok(())
179 }
180
181 #[must_use]
194 #[allow(clippy::missing_panics_doc)]
195 pub fn send(&mut self, now: Millis, max_count: usize) -> Vec<u32> {
196 let mut filtered_out_indices: Vec<u32> = self
198 .entries
199 .iter()
200 .skip(self.start_index_to_send)
201 .take(max_count) .filter(|entry| {
203 !entry.is_received_by_remote
205 && entry
206 .last_sent_at
207 .map_or(true, |t| now.duration_since(t) >= self.resend_duration)
208 })
209 .map(|entry| entry.index)
210 .collect(); if filtered_out_indices.len() < max_count {
213 let lower_index = self.start_index_to_send + max_count;
214 let expected_remaining = max_count - filtered_out_indices.len();
215
216 if self.index_to_start_from_if_not_filled_up + expected_remaining > self.entries.len() {
217 self.index_to_start_from_if_not_filled_up = lower_index;
218 }
219
220 if self.index_to_start_from_if_not_filled_up < lower_index {
221 self.index_to_start_from_if_not_filled_up = lower_index;
222 }
223
224 let additional_indicies: Vec<u32> = self
226 .entries
227 .iter()
228 .skip(self.index_to_start_from_if_not_filled_up) .filter(|entry| {
230 !entry.is_received_by_remote
232 && !filtered_out_indices.iter().any(|e| *e == entry.index)
233 })
234 .map(|entry| entry.index)
235 .take(expected_remaining) .collect();
237
238 self.index_to_start_from_if_not_filled_up += additional_indicies.len();
239 filtered_out_indices.extend(additional_indicies);
241 }
242
243 for entry_index in &filtered_out_indices {
244 #[allow(clippy::missing_panics_doc)]
245 let ent = self
246 .entries
247 .get_mut(*entry_index as usize)
248 .expect("should always be there");
249 ent.sent_at_time(now);
250 }
251
252 filtered_out_indices
253 }
254
255 #[must_use]
256 pub fn is_received_by_remote(&self) -> bool {
257 self.chunk_count_received_by_remote == self.entries.len()
258 }
259}