blob_stream/out_stream.rs
1/*
2 * Copyright (c) Peter Bjorklund. All rights reserved. https://github.com/nimble-rust/nimble
3 * Licensed under the MIT License. See LICENSE in the project root for license information.
4 */
5use std::cmp::min;
6use std::time::{Duration, Instant};
7
8#[derive(Debug)]
9pub enum OutStreamError {
10 ChunkPreviouslyReceivedMarkedAsNotReceived,
11 IndexOutOfBounds,
12}
13
14/// Represents an individual chunk of the blob data being streamed out.
15/// Each `BlobStreamOutEntry` holds metadata about a chunk, including:
16/// - `timer`: The time when the chunk was last sent, or `None` if it has not been sent.
17/// - `index`: The index of the chunk.
18/// - `start` and `end`: Byte ranges representing the chunk's position within the full blob.
19#[derive(Clone, Debug, Eq, PartialEq)]
20pub struct BlobStreamOutEntry {
21 pub last_sent_at: Option<Instant>,
22 pub index: usize,
23 pub is_received_by_remote: bool,
24}
25
26impl BlobStreamOutEntry {
27 /// Creates a new `BlobStreamOutEntry`.
28 ///
29 /// # Arguments
30 ///
31 /// * `index` - The index of the chunk.
32 ///
33 /// # Returns
34 ///
35 /// A new `BlobStreamOutEntry` with a `None` timer.
36 #[must_use]
37 pub fn new(index: usize) -> Self {
38 Self {
39 last_sent_at: None,
40 index,
41 is_received_by_remote: false,
42 }
43 }
44
45 /// Updates the timer to the specified `Instant`, marking the time this entry was last sent.
46 ///
47 /// # Arguments
48 ///
49 /// * `time` - The `Instant` at which the entry is being sent.
50 pub fn sent_at_time(&mut self, time: Instant) {
51 self.last_sent_at = Some(time);
52 }
53}
54
55/// Manages the streaming out of binary blob data, split into fixed-size chunks.
56/// `BlobStreamOut` keeps track of which chunks have been sent, the time they were sent,
57/// and controls resending based on elapsed time since the last send.
58#[allow(unused)]
59#[derive(Debug)]
60pub struct BlobStreamOut {
61 pub(crate) entries: Vec<BlobStreamOutEntry>,
62 start_index_to_send: usize,
63 index_to_start_from_if_not_filled_up: usize,
64 resend_duration: Duration,
65 chunk_count_received_by_remote: usize,
66}
67
68impl BlobStreamOut {
69 /// Creates a new `BlobStreamOut` instance.
70 ///
71 /// # Arguments
72 ///
73 /// * `chunk_count` - The total number of chunks.
74 /// * `resend_duration` - The minimum time that must elapse before resending a chunk.
75 /// * `blob` - The complete binary data to be streamed out.
76 ///
77 /// # Returns
78 ///
79 /// A new `BlobStreamOut` initialized with the provided chunk size, resend duration, and blob data.
80 ///
81 /// # Panics
82 ///
83 /// This function will panic if `fixed_chunk_size` is zero.
84 #[must_use]
85 pub fn new(chunk_count: usize, resend_duration: Duration) -> Self {
86 assert_ne!(chunk_count, 0, "chunk_count cannot be zero");
87
88 // Initialize the entries vector by chunking the blob data
89 let entries: Vec<BlobStreamOutEntry> =
90 (0..chunk_count).map(BlobStreamOutEntry::new).collect();
91
92 Self {
93 entries,
94 resend_duration,
95 index_to_start_from_if_not_filled_up: 0,
96 start_index_to_send: 0,
97 chunk_count_received_by_remote: 0,
98 }
99 }
100
101 pub fn chunk_count(&self) -> usize {
102 self.entries.len()
103 }
104
105 /// Sets the starting index from which to send the next chunk.
106 ///
107 /// # Arguments
108 ///
109 /// * `index` - The starting index of the next chunk to be sent.
110 pub fn set_waiting_for_chunk_index(
111 &mut self,
112 index: usize,
113 receive_mask: u64,
114 ) -> Result<(), OutStreamError> {
115 self.start_index_to_send = index;
116
117 if index > self.start_index_to_send {
118 return Err(OutStreamError::IndexOutOfBounds);
119 }
120 let start = index + 1;
121 let end = min(self.entries.len(), start + 64);
122
123 for previously_received_entry in self.entries[0..index].iter_mut() {
124 if !previously_received_entry.is_received_by_remote {
125 previously_received_entry.is_received_by_remote = true;
126 self.chunk_count_received_by_remote += 1;
127 }
128 }
129
130 if index < self.entries.len() {
131 let waiting_for_entry = self
132 .entries
133 .get_mut(index)
134 .expect("entry index should been validated earlier");
135 // it is not allowed to go from being received by remote to suddenly not be received anymore.
136 if waiting_for_entry.is_received_by_remote {
137 return Err(OutStreamError::ChunkPreviouslyReceivedMarkedAsNotReceived);
138 }
139 waiting_for_entry.last_sent_at = None;
140 }
141
142 let mut mask = receive_mask;
143 for i in index + 1..end {
144 let entry = self
145 .entries
146 .get_mut(i)
147 .expect("entry index should been validated earlier");
148 if mask & 0b1 != 0 {
149 if !entry.is_received_by_remote {
150 entry.is_received_by_remote = true;
151 self.chunk_count_received_by_remote += 1;
152 }
153 } else {
154 // it is not allowed to go from being received by remote to suddenly not be received anymore.
155 if entry.is_received_by_remote {
156 return Err(OutStreamError::ChunkPreviouslyReceivedMarkedAsNotReceived);
157 }
158 entry.last_sent_at = None;
159 }
160 mask >>= 1;
161 }
162
163 Ok(())
164 }
165
166 /// Sends up to `max_count` chunks, starting from the configured `start_index_to_send`.
167 /// Resends chunks if enough time has passed since their last send, or fills in additional
168 /// chunks if the number of filtered chunks is less than `max_count`.
169 ///
170 /// # Arguments
171 ///
172 /// * `now` - The current time used for calculating elapsed time.
173 /// * `max_count` - The maximum number of chunks to send in this batch.
174 ///
175 /// # Returns
176 ///
177 /// A vector containing up to `max_count` `BlobStreamOutEntry` items, representing the chunks to be sent.
178 pub fn send(&mut self, now: Instant, max_count: usize) -> Vec<usize> {
179 // Filter by index range, timer expiration, and limit the number of results
180 let mut filtered_out_indices: Vec<usize> = self
181 .entries
182 .iter()
183 .skip(self.start_index_to_send)
184 .take(max_count) // Limit to MAX_COUNT entries
185 .filter(|entry| {
186 // Check if enough time has passed since the timer was set
187 !entry.is_received_by_remote
188 && entry
189 .last_sent_at
190 .map_or(true, |t| now.duration_since(t) >= self.resend_duration)
191 })
192 .map(|entry| entry.index)
193 .collect(); // Collect into a Vec
194
195 if filtered_out_indices.len() < max_count {
196 let lower_index = self.start_index_to_send + max_count;
197 let expected_remaining = max_count - filtered_out_indices.len();
198
199 if self.index_to_start_from_if_not_filled_up + expected_remaining > self.entries.len() {
200 self.index_to_start_from_if_not_filled_up = lower_index;
201 }
202
203 if self.index_to_start_from_if_not_filled_up < lower_index {
204 self.index_to_start_from_if_not_filled_up = lower_index;
205 }
206
207 // Get additional entries starting from `index_to_start_from_if_not_filled_up`
208 let additional_indicies: Vec<usize> = self
209 .entries
210 .iter()
211 .skip(self.index_to_start_from_if_not_filled_up) // Start from the alternate index
212 .filter(|entry| {
213 // Ensure that we are not duplicating any already selected entries
214 !entry.is_received_by_remote
215 && !filtered_out_indices.iter().any(|e| *e == entry.index)
216 })
217 .map(|entry| entry.index)
218 .take(expected_remaining) // Take only the number of remaining entries
219 .collect();
220
221 self.index_to_start_from_if_not_filled_up += additional_indicies.len();
222 // Append additional entries to fill up to `max_count`
223 filtered_out_indices.extend(additional_indicies);
224 }
225
226 for entry_index in filtered_out_indices.iter() {
227 let ent = self
228 .entries
229 .get_mut(*entry_index)
230 .expect("should always be there");
231 ent.sent_at_time(now);
232 }
233
234 filtered_out_indices
235 }
236
237 pub fn is_received_by_remote(&self) -> bool {
238 self.chunk_count_received_by_remote == self.entries.len()
239 }
240}