nimble_blob_stream/
out_stream.rs

1/*
2 * Copyright (c) Peter Bjorklund. All rights reserved. https://github.com/nimble-rust/nimble
3 * Licensed under the MIT License. See LICENSE in the project root for license information.
4 */
5use monotonic_time_rs::Millis;
6use std::cmp::min;
7use std::num::TryFromIntError;
8use std::time::Duration;
9
10#[derive(Debug)]
11#[allow(clippy::module_name_repetitions)]
12pub enum OutStreamError {
13    ChunkPreviouslyReceivedMarkedAsNotReceived,
14    IndexOutOfBounds,
15    BlobIsTooLarge(TryFromIntError),
16    UnexpectedStartTransfer,
17    FixedChunkSizeIsTooLarge,
18}
19
20/// Represents an individual chunk of the blob data being streamed out.
21/// Each `BlobStreamOutEntry` holds metadata about a chunk, including:
22/// - `timer`: The time when the chunk was last sent, or `None` if it has not been sent.
23/// - `index`: The index of the chunk.
24/// - `start` and `end`: Byte ranges representing the chunk's position within the full blob.
25#[derive(Clone, Debug, Eq, PartialEq)]
26pub struct BlobStreamOutEntry {
27    pub last_sent_at: Option<Millis>,
28    pub index: u32,
29    pub is_received_by_remote: bool,
30}
31
32impl BlobStreamOutEntry {
33    /// Creates a new `BlobStreamOutEntry`.
34    ///
35    /// # Arguments
36    ///
37    /// * `index` - The index of the chunk.
38    ///
39    /// # Returns
40    ///
41    /// A new `BlobStreamOutEntry` with a `None` timer.
42    #[must_use]
43    pub const fn new(index: u32) -> Self {
44        Self {
45            last_sent_at: None,
46            index,
47            is_received_by_remote: false,
48        }
49    }
50
51    /// Updates the timer to the specified `Instant`, marking the time this entry was last sent.
52    ///
53    /// # Arguments
54    ///
55    /// * `time` - The `Instant` at which the entry is being sent.
56    pub fn sent_at_time(&mut self, time: Millis) {
57        self.last_sent_at = Some(time);
58    }
59}
60
61/// Manages the streaming out of binary blob data
62///
63/// It splits into fixed-size chunks.
64/// `BlobStreamOut` keeps track of which chunks have been sent, the time they were sent,
65/// and controls resending based on elapsed time since the last send.
66#[allow(unused)]
67#[derive(Debug)]
68pub struct BlobStreamOut {
69    pub(crate) entries: Vec<BlobStreamOutEntry>,
70    start_index_to_send: usize,
71    index_to_start_from_if_not_filled_up: usize,
72    resend_duration: Duration,
73    chunk_count_received_by_remote: usize,
74}
75
76impl BlobStreamOut {
77    /// Creates a new `BlobStreamOut` instance.
78    ///
79    /// # Arguments
80    ///
81    /// * `chunk_count` - The total number of chunks.
82    /// * `resend_duration` - The minimum time that must elapse before resending a chunk.
83    /// * `blob` - The complete binary data to be streamed out.
84    ///
85    /// # Returns
86    ///
87    /// A new `BlobStreamOut` initialized with the provided chunk size, resend duration, and blob data.
88    ///
89    /// # Panics
90    ///
91    /// This function will panic if `fixed_chunk_size` is zero.
92    #[must_use]
93    pub fn new(chunk_count: u32, resend_duration: Duration) -> Self {
94        assert_ne!(chunk_count, 0, "chunk_count cannot be zero");
95
96        // Initialize the entries vector by chunking the blob data
97        let entries: Vec<BlobStreamOutEntry> =
98            (0u32..chunk_count).map(BlobStreamOutEntry::new).collect();
99
100        Self {
101            entries,
102            resend_duration,
103            index_to_start_from_if_not_filled_up: 0,
104            start_index_to_send: 0,
105            chunk_count_received_by_remote: 0,
106        }
107    }
108
109    #[must_use]
110    #[allow(clippy::cast_possible_truncation)]
111    pub fn chunk_count(&self) -> u32 {
112        self.entries.len() as u32
113    }
114
115    /// Sets the starting index from which to send the next chunk.
116    ///
117    /// # Arguments
118    ///
119    /// * `index` - The starting index of the next chunk to be sent.
120    ///
121    /// # Errors
122    /// Can return error `OutStreamError` // TODO:
123    #[allow(clippy::missing_panics_doc)]
124    pub fn set_waiting_for_chunk_index(
125        &mut self,
126        index: usize,
127        receive_mask: u64,
128    ) -> Result<(), OutStreamError> {
129        self.start_index_to_send = index;
130
131        if index > self.start_index_to_send {
132            return Err(OutStreamError::IndexOutOfBounds);
133        }
134        let start = index + 1;
135        let end = min(self.entries.len(), start + 64);
136
137        for previously_received_entry in &mut self.entries[0..index] {
138            if !previously_received_entry.is_received_by_remote {
139                previously_received_entry.is_received_by_remote = true;
140                self.chunk_count_received_by_remote += 1;
141            }
142        }
143
144        if index < self.entries.len() {
145            let waiting_for_entry = self
146                .entries
147                .get_mut(index)
148                .expect("entry index should been validated earlier");
149            // it is not allowed to go from being received by remote to suddenly not be received anymore.
150            if waiting_for_entry.is_received_by_remote {
151                return Err(OutStreamError::ChunkPreviouslyReceivedMarkedAsNotReceived);
152            }
153            waiting_for_entry.last_sent_at = None;
154        }
155
156        let mut mask = receive_mask;
157        for i in index + 1..end {
158            #[allow(clippy::missing_panics_doc)]
159            let entry = self
160                .entries
161                .get_mut(i)
162                .expect("entry index should been validated earlier");
163            if mask & 0b1 != 0 {
164                if !entry.is_received_by_remote {
165                    entry.is_received_by_remote = true;
166                    self.chunk_count_received_by_remote += 1;
167                }
168            } else {
169                // it is not allowed to go from being received by remote to suddenly not be received anymore.
170                if entry.is_received_by_remote {
171                    return Err(OutStreamError::ChunkPreviouslyReceivedMarkedAsNotReceived);
172                }
173                entry.last_sent_at = None;
174            }
175            mask >>= 1;
176        }
177
178        Ok(())
179    }
180
181    /// Sends up to `max_count` chunks, starting from the configured `start_index_to_send`.
182    /// Resends chunks if enough time has passed since their last send, or fills in additional
183    /// chunks if the number of filtered chunks is less than `max_count`.
184    ///
185    /// # Arguments
186    ///
187    /// * `now` - The current time used for calculating elapsed time.
188    /// * `max_count` - The maximum number of chunks to send in this batch.
189    ///
190    /// # Returns
191    ///
192    /// A vector containing up to `max_count` `BlobStreamOutEntry` items, representing the chunks to be sent.
193    #[must_use]
194    #[allow(clippy::missing_panics_doc)]
195    pub fn send(&mut self, now: Millis, max_count: usize) -> Vec<u32> {
196        // Filter by index range, timer expiration, and limit the number of results
197        let mut filtered_out_indices: Vec<u32> = self
198            .entries
199            .iter()
200            .skip(self.start_index_to_send)
201            .take(max_count) // Limit to MAX_COUNT entries
202            .filter(|entry| {
203                // Check if enough time has passed since the timer was set
204                !entry.is_received_by_remote
205                    && entry
206                    .last_sent_at
207                    .map_or(true, |t| now.duration_since(t) >= self.resend_duration)
208            })
209            .map(|entry| entry.index)
210            .collect(); // Collect into a Vec
211
212        if filtered_out_indices.len() < max_count {
213            let lower_index = self.start_index_to_send + max_count;
214            let expected_remaining = max_count - filtered_out_indices.len();
215
216            if self.index_to_start_from_if_not_filled_up + expected_remaining > self.entries.len() {
217                self.index_to_start_from_if_not_filled_up = lower_index;
218            }
219
220            if self.index_to_start_from_if_not_filled_up < lower_index {
221                self.index_to_start_from_if_not_filled_up = lower_index;
222            }
223
224            // Get additional entries starting from `index_to_start_from_if_not_filled_up`
225            let additional_indicies: Vec<u32> = self
226                .entries
227                .iter()
228                .skip(self.index_to_start_from_if_not_filled_up) // Start from the alternate index
229                .filter(|entry| {
230                    // Ensure that we are not duplicating any already selected entries
231                    !entry.is_received_by_remote
232                        && !filtered_out_indices.iter().any(|e| *e == entry.index)
233                })
234                .map(|entry| entry.index)
235                .take(expected_remaining) // Take only the number of remaining entries
236                .collect();
237
238            self.index_to_start_from_if_not_filled_up += additional_indicies.len();
239            // Append additional entries to fill up to `max_count`
240            filtered_out_indices.extend(additional_indicies);
241        }
242
243        for entry_index in &filtered_out_indices {
244            #[allow(clippy::missing_panics_doc)]
245            let ent = self
246                .entries
247                .get_mut(*entry_index as usize)
248                .expect("should always be there");
249            ent.sent_at_time(now);
250        }
251
252        filtered_out_indices
253    }
254
255    #[must_use]
256    pub fn is_received_by_remote(&self) -> bool {
257        self.chunk_count_received_by_remote == self.entries.len()
258    }
259}