blob_stream/
out_stream.rs

1/*
2 * Copyright (c) Peter Bjorklund. All rights reserved. https://github.com/nimble-rust/nimble
3 * Licensed under the MIT License. See LICENSE in the project root for license information.
4 */
5use std::cmp::min;
6use std::time::{Duration, Instant};
7
8#[derive(Debug)]
9pub enum OutStreamError {
10    ChunkPreviouslyReceivedMarkedAsNotReceived,
11    IndexOutOfBounds,
12}
13
14/// Represents an individual chunk of the blob data being streamed out.
15/// Each `BlobStreamOutEntry` holds metadata about a chunk, including:
16/// - `timer`: The time when the chunk was last sent, or `None` if it has not been sent.
17/// - `index`: The index of the chunk.
18/// - `start` and `end`: Byte ranges representing the chunk's position within the full blob.
19#[derive(Clone, Debug, Eq, PartialEq)]
20pub struct BlobStreamOutEntry {
21    pub last_sent_at: Option<Instant>,
22    pub index: usize,
23    pub is_received_by_remote: bool,
24}
25
26impl BlobStreamOutEntry {
27    /// Creates a new `BlobStreamOutEntry`.
28    ///
29    /// # Arguments
30    ///
31    /// * `index` - The index of the chunk.
32    ///
33    /// # Returns
34    ///
35    /// A new `BlobStreamOutEntry` with a `None` timer.
36    #[must_use]
37    pub fn new(index: usize) -> Self {
38        Self {
39            last_sent_at: None,
40            index,
41            is_received_by_remote: false,
42        }
43    }
44
45    /// Updates the timer to the specified `Instant`, marking the time this entry was last sent.
46    ///
47    /// # Arguments
48    ///
49    /// * `time` - The `Instant` at which the entry is being sent.
50    pub fn sent_at_time(&mut self, time: Instant) {
51        self.last_sent_at = Some(time);
52    }
53}
54
55/// Manages the streaming out of binary blob data, split into fixed-size chunks.
56/// `BlobStreamOut` keeps track of which chunks have been sent, the time they were sent,
57/// and controls resending based on elapsed time since the last send.
58#[allow(unused)]
59#[derive(Debug)]
60pub struct BlobStreamOut {
61    pub(crate) entries: Vec<BlobStreamOutEntry>,
62    start_index_to_send: usize,
63    index_to_start_from_if_not_filled_up: usize,
64    resend_duration: Duration,
65    chunk_count_received_by_remote: usize,
66}
67
68impl BlobStreamOut {
69    /// Creates a new `BlobStreamOut` instance.
70    ///
71    /// # Arguments
72    ///
73    /// * `chunk_count` - The total number of chunks.
74    /// * `resend_duration` - The minimum time that must elapse before resending a chunk.
75    /// * `blob` - The complete binary data to be streamed out.
76    ///
77    /// # Returns
78    ///
79    /// A new `BlobStreamOut` initialized with the provided chunk size, resend duration, and blob data.
80    ///
81    /// # Panics
82    ///
83    /// This function will panic if `fixed_chunk_size` is zero.
84    #[must_use]
85    pub fn new(chunk_count: usize, resend_duration: Duration) -> Self {
86        assert_ne!(chunk_count, 0, "chunk_count cannot be zero");
87
88        // Initialize the entries vector by chunking the blob data
89        let entries: Vec<BlobStreamOutEntry> =
90            (0..chunk_count).map(BlobStreamOutEntry::new).collect();
91
92        Self {
93            entries,
94            resend_duration,
95            index_to_start_from_if_not_filled_up: 0,
96            start_index_to_send: 0,
97            chunk_count_received_by_remote: 0,
98        }
99    }
100
101    pub fn chunk_count(&self) -> usize {
102        self.entries.len()
103    }
104
105    /// Sets the starting index from which to send the next chunk.
106    ///
107    /// # Arguments
108    ///
109    /// * `index` - The starting index of the next chunk to be sent.
110    pub fn set_waiting_for_chunk_index(
111        &mut self,
112        index: usize,
113        receive_mask: u64,
114    ) -> Result<(), OutStreamError> {
115        self.start_index_to_send = index;
116
117        if index > self.start_index_to_send {
118            return Err(OutStreamError::IndexOutOfBounds);
119        }
120        let start = index + 1;
121        let end = min(self.entries.len(), start + 64);
122
123        for previously_received_entry in self.entries[0..index].iter_mut() {
124            if !previously_received_entry.is_received_by_remote {
125                previously_received_entry.is_received_by_remote = true;
126                self.chunk_count_received_by_remote += 1;
127            }
128        }
129
130        if index < self.entries.len() {
131            let waiting_for_entry = self
132                .entries
133                .get_mut(index)
134                .expect("entry index should been validated earlier");
135            // it is not allowed to go from being received by remote to suddenly not be received anymore.
136            if waiting_for_entry.is_received_by_remote {
137                return Err(OutStreamError::ChunkPreviouslyReceivedMarkedAsNotReceived);
138            }
139            waiting_for_entry.last_sent_at = None;
140        }
141
142        let mut mask = receive_mask;
143        for i in index + 1..end {
144            let entry = self
145                .entries
146                .get_mut(i)
147                .expect("entry index should been validated earlier");
148            if mask & 0b1 != 0 {
149                if !entry.is_received_by_remote {
150                    entry.is_received_by_remote = true;
151                    self.chunk_count_received_by_remote += 1;
152                }
153            } else {
154                // it is not allowed to go from being received by remote to suddenly not be received anymore.
155                if entry.is_received_by_remote {
156                    return Err(OutStreamError::ChunkPreviouslyReceivedMarkedAsNotReceived);
157                }
158                entry.last_sent_at = None;
159            }
160            mask >>= 1;
161        }
162
163        Ok(())
164    }
165
166    /// Sends up to `max_count` chunks, starting from the configured `start_index_to_send`.
167    /// Resends chunks if enough time has passed since their last send, or fills in additional
168    /// chunks if the number of filtered chunks is less than `max_count`.
169    ///
170    /// # Arguments
171    ///
172    /// * `now` - The current time used for calculating elapsed time.
173    /// * `max_count` - The maximum number of chunks to send in this batch.
174    ///
175    /// # Returns
176    ///
177    /// A vector containing up to `max_count` `BlobStreamOutEntry` items, representing the chunks to be sent.
178    pub fn send(&mut self, now: Instant, max_count: usize) -> Vec<usize> {
179        // Filter by index range, timer expiration, and limit the number of results
180        let mut filtered_out_indices: Vec<usize> = self
181            .entries
182            .iter()
183            .skip(self.start_index_to_send)
184            .take(max_count) // Limit to MAX_COUNT entries
185            .filter(|entry| {
186                // Check if enough time has passed since the timer was set
187                !entry.is_received_by_remote
188                    && entry
189                        .last_sent_at
190                        .map_or(true, |t| now.duration_since(t) >= self.resend_duration)
191            })
192            .map(|entry| entry.index)
193            .collect(); // Collect into a Vec
194
195        if filtered_out_indices.len() < max_count {
196            let lower_index = self.start_index_to_send + max_count;
197            let expected_remaining = max_count - filtered_out_indices.len();
198
199            if self.index_to_start_from_if_not_filled_up + expected_remaining > self.entries.len() {
200                self.index_to_start_from_if_not_filled_up = lower_index;
201            }
202
203            if self.index_to_start_from_if_not_filled_up < lower_index {
204                self.index_to_start_from_if_not_filled_up = lower_index;
205            }
206
207            // Get additional entries starting from `index_to_start_from_if_not_filled_up`
208            let additional_indicies: Vec<usize> = self
209                .entries
210                .iter()
211                .skip(self.index_to_start_from_if_not_filled_up) // Start from the alternate index
212                .filter(|entry| {
213                    // Ensure that we are not duplicating any already selected entries
214                    !entry.is_received_by_remote
215                        && !filtered_out_indices.iter().any(|e| *e == entry.index)
216                })
217                .map(|entry| entry.index)
218                .take(expected_remaining) // Take only the number of remaining entries
219                .collect();
220
221            self.index_to_start_from_if_not_filled_up += additional_indicies.len();
222            // Append additional entries to fill up to `max_count`
223            filtered_out_indices.extend(additional_indicies);
224        }
225
226        for entry_index in filtered_out_indices.iter() {
227            let ent = self
228                .entries
229                .get_mut(*entry_index)
230                .expect("should always be there");
231            ent.sent_at_time(now);
232        }
233
234        filtered_out_indices
235    }
236
237    pub fn is_received_by_remote(&self) -> bool {
238        self.chunk_count_received_by_remote == self.entries.len()
239    }
240}