Skip to main content

xet_data/file_reconstruction/reconstruction_terms/
manager.rs

1use std::collections::VecDeque;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU64, Ordering};
4
5use more_asserts::*;
6use tokio::task::JoinHandle;
7use tokio::time::Instant;
8use tracing::{debug, info};
9use xet_client::cas_client::Client;
10use xet_client::cas_types::FileRange;
11use xet_core_structures::ExpWeightedMovingAvg;
12use xet_core_structures::merklehash::MerkleHash;
13use xet_runtime::config::ReconstructionConfig;
14
15use super::super::FileReconstructionError;
16use super::super::error::Result;
17use super::file_term::{FileTerm, retrieve_file_term_block};
18use crate::progress_tracking::ItemProgressUpdater;
19
20type RawFetchedFileTerms = Result<Option<(Vec<FileTerm>, u64, u64)>>;
21
22/// Manages the iteration over file terms during reconstruction, with adaptive prefetching.
23/// Prefetches reconstruction blocks ahead of consumption based on observed completion rates
24/// to minimize download latency while controlling memory usage.
25pub struct ReconstructionTermManager {
26    config: Arc<ReconstructionConfig>,
27    client: Arc<dyn Client>,
28    file_hash: MerkleHash,
29    requested_byte_range: FileRange,
30    last_block_info: Option<(Instant, FileRange)>,
31    known_final_byte_position: Arc<AtomicU64>,
32    prefetched_byte_position: u64,
33    current_active_byte_position: u64,
34    prefetch_queue: VecDeque<JoinHandle<RawFetchedFileTerms>>,
35    completion_rate_estimator: ExpWeightedMovingAvg,
36    progress_updater: Option<Arc<ItemProgressUpdater>>,
37    total_bytes_reported: u64,
38    total_transfer_bytes_reported: u64,
39}
40
41impl ReconstructionTermManager {
42    pub async fn new(
43        config: Arc<ReconstructionConfig>,
44        client: Arc<dyn Client>,
45        file_hash: MerkleHash,
46        file_byte_range: FileRange,
47        progress_updater: Option<Arc<ItemProgressUpdater>>,
48    ) -> Result<Self> {
49        let completion_rate_estimator =
50            ExpWeightedMovingAvg::new_count_decay(config.completion_rate_estimator_half_life);
51
52        let requested_byte_range = file_byte_range;
53
54        let mut s = Self {
55            config,
56            client,
57            file_hash,
58            requested_byte_range,
59            last_block_info: None,
60            prefetched_byte_position: requested_byte_range.start,
61            current_active_byte_position: requested_byte_range.start,
62            prefetch_queue: VecDeque::new(),
63            known_final_byte_position: Arc::new(AtomicU64::new(requested_byte_range.end)),
64            completion_rate_estimator,
65            progress_updater,
66            total_bytes_reported: 0,
67            total_transfer_bytes_reported: 0,
68        };
69
70        // Start things by prefetching two smaller blocks to get things started.  This way,
71        // once the first block is finished, we have a second block to start processing -- and
72        // an estimate of the completion time based on the first one.  This helps us to get
73        // a better estimate of the completion time.
74        let initial_fetch_size = s.config.min_reconstruction_fetch_size.as_u64();
75        s.prefetch_block(initial_fetch_size).await?;
76        s.prefetch_block(2 * initial_fetch_size).await?;
77
78        debug!(
79            %file_hash,
80            prefetch_queue_size = s.prefetch_queue.len(),
81            "Initial prefetch blocks queued"
82        );
83
84        Ok(s)
85    }
86
87    /// Returns the next block of file terms, or None if reconstruction is complete.
88    /// Updates the completion rate estimator based on the time since the last call.
89    pub async fn next_file_terms(&mut self) -> Result<Option<Vec<FileTerm>>> {
90        // Update completion rate estimator if we have timing info from a previous block.
91        if let Some((block_start_time, block_range)) = self.last_block_info.take() {
92            let completion_time = Instant::now().duration_since(block_start_time).as_secs_f64();
93            let block_size = block_range.end - block_range.start;
94
95            if block_size != 0 {
96                self.completion_rate_estimator
97                    .update((block_size as f64) / completion_time.max(1e-6));
98            }
99
100            info!(
101                file_hash = %self.file_hash,
102                block_start = block_range.start,
103                block_end = block_range.end,
104                block_size = block_size,
105                completion_time = completion_time,
106                "Updated completion rate estimate based on previous block completion time (seconds)."
107            );
108        }
109
110        // Check the prefetch buffer to possibly prefetch the next block.
111        self.check_prefetch_buffer().await?;
112
113        let Some(next_block_jh) = self.prefetch_queue.pop_front() else {
114            // If there are no more prefetched terms then we're done.
115            // Note: we check against known_final_byte_position since requested_byte_range.end
116            // may be u64::MAX if the full file was requested.
117            debug_assert_ge!(self.prefetched_byte_position, self.known_final_byte_position.load(Ordering::Relaxed));
118            return Ok(None);
119        };
120
121        let maybe_next_block = next_block_jh
122            .await
123            .map_err(|e| FileReconstructionError::InternalError(format!("Join error: {e}")))??;
124
125        if let Some((file_terms, new_bytes, new_transfer_bytes)) = maybe_next_block {
126            // Extract the download domain from the first file term's URL.
127            let domain = file_terms
128                .first()
129                .and_then(|t| t.url_info.xorb_block_retrieval_urls.try_read().ok())
130                .and_then(|urls| {
131                    urls.1
132                        .first()
133                        .and_then(|(url, _)| url::Url::parse(url).ok())
134                        .and_then(|u| u.host_str().map(str::to_owned))
135                });
136
137            // Calculate the byte range of this block from the file terms.
138            let block_start = file_terms.first().map(|t| t.byte_range.start).unwrap_or(0);
139            let block_end = file_terms.last().map(|t| t.byte_range.end).unwrap_or(0);
140
141            // Record timing info for the next call.
142            self.last_block_info = Some((Instant::now(), FileRange::new(block_start, block_end)));
143
144            // Update the current active byte position.
145            self.current_active_byte_position = block_end;
146
147            info!(
148                file_hash = %self.file_hash,
149                domain = domain.as_deref().unwrap_or("unknown"),
150                block_start = block_start,
151                block_end = block_end,
152                block_size = file_terms.len(),
153                "Received block of file terms from prefetch queue"
154            );
155
156            if let Some(progress_updater) = &self.progress_updater {
157                self.total_bytes_reported = self.total_bytes_reported.saturating_add(new_bytes);
158                self.total_transfer_bytes_reported =
159                    self.total_transfer_bytes_reported.saturating_add(new_transfer_bytes);
160                progress_updater.update_item_size(self.total_bytes_reported, false);
161                progress_updater.update_transfer_size(self.total_transfer_bytes_reported);
162            }
163
164            Ok(Some(file_terms))
165        } else {
166            // We've completed the iteration, so record the final byte position.
167            self.known_final_byte_position
168                .store(self.prefetched_byte_position, Ordering::Relaxed);
169
170            if let Some(progress_updater) = &self.progress_updater {
171                progress_updater.update_item_size(self.total_bytes_reported, true);
172            }
173
174            info!(
175                file_hash = %self.file_hash,
176                prefetched_byte_position = self.prefetched_byte_position,
177                "Completed prefetch queue; end of file reached."
178            );
179
180            Ok(None)
181        }
182    }
183
184    fn is_done_fetching(&self) -> bool {
185        self.prefetched_byte_position >= self.known_final_byte_position.load(Ordering::Relaxed)
186    }
187
188    /// Checks the prefetch queue to ensure that we have enough incoming to keep everything happy.
189    async fn check_prefetch_buffer(&mut self) -> Result<()> {
190        // If we're done, then there's nothing more to do.
191        if self.is_done_fetching() {
192            return Ok(());
193        }
194
195        // How long we expect for a reconstruction block to complete.
196        let target_completion_time = self.config.target_block_completion_time.as_secs_f64();
197
198        // We choose a next block size to complete within minutes based on the
199        // current observations of how long it takes.
200        let completion_rate = self.completion_rate_estimator.value();
201
202        // The target prefetch buffer size.  We want to make sure at least
203        // this much has been prefetched.
204        let prefetch_buffer_target_size = target_completion_time * completion_rate;
205
206        // We need to maintain a minimum amount in the prefetch buffer.
207        let min_prefetch_buffer_size = self.config.min_prefetch_buffer.as_u64() as f64;
208        let prefetch_buffer_size = prefetch_buffer_target_size.max(min_prefetch_buffer_size);
209
210        // The current prefetch buffer size; we want to expand this by the target size.
211        let current_prefetch_buffer_size = self.prefetched_byte_position - self.current_active_byte_position;
212
213        // If we're already at or above the target prefetch buffer size, then don't prefetch more
214        // unless the queue is empty.
215        if !self.prefetch_queue.is_empty() && prefetch_buffer_size <= current_prefetch_buffer_size as f64 {
216            return Ok(());
217        }
218
219        // Let's see what we need to prefetch here.
220        let next_prefetch_target_block_size = (prefetch_buffer_size - current_prefetch_buffer_size as f64) as u64;
221
222        let min_fetch_size = self.config.min_reconstruction_fetch_size.as_u64();
223        let max_fetch_size = self.config.max_reconstruction_fetch_size.as_u64().max(min_fetch_size);
224        let next_prefetch_block_size = next_prefetch_target_block_size.clamp(min_fetch_size, max_fetch_size);
225
226        // Okay, now add this to the prefetch queue.
227        self.prefetch_block(next_prefetch_block_size).await
228    }
229
230    async fn prefetch_block(&mut self, block_size: u64) -> Result<()> {
231        let block_size = block_size.clamp(
232            self.config.min_reconstruction_fetch_size.as_u64(),
233            self.config.max_reconstruction_fetch_size.as_u64(),
234        );
235
236        // First, check the block range to see if we're over the requested range.
237        let mut prefetch_block_range =
238            FileRange::new(self.prefetched_byte_position, self.prefetched_byte_position + block_size);
239
240        // Get the end of the known range, if it is known.  If it's unknown, this is u64::MAX.
241        let last_byte_position = self
242            .known_final_byte_position
243            .load(Ordering::Relaxed)
244            .min(self.requested_byte_range.end);
245
246        // Clamp to the requested range.
247        if prefetch_block_range.end > last_byte_position {
248            prefetch_block_range.end = last_byte_position;
249        }
250
251        // Check if we should extend this one to the end.
252        let min_fetch_size = self.config.min_reconstruction_fetch_size.as_u64();
253        if prefetch_block_range.end + min_fetch_size > self.requested_byte_range.end {
254            prefetch_block_range.end = self.requested_byte_range.end;
255        }
256
257        // It's possible that the start is at or past the end of the requested range; in that case, do nothing.
258        // This also handles empty files where start >= end.
259        if prefetch_block_range.start >= prefetch_block_range.end {
260            debug!(
261                file_hash = %self.file_hash,
262                "Prefetch block skipped - already at or past end of requested range"
263            );
264            return Ok(());
265        }
266
267        let actual_block_size = prefetch_block_range.end - prefetch_block_range.start;
268        info!(
269            file_hash = %self.file_hash,
270            prefetch_range = ?(prefetch_block_range.start, prefetch_block_range.end),
271            requested_block_size = block_size,
272            actual_block_size,
273            queue_depth = self.prefetch_queue.len() + 1,
274            "Scheduling prefetch block"
275        );
276
277        // Update the prefetched position now.
278        self.prefetched_byte_position = prefetch_block_range.end;
279
280        // Add the prefetch task to the queue.
281        let known_final_byte_position = self.known_final_byte_position.clone();
282        let client = self.client.clone();
283        let file_hash = self.file_hash;
284
285        let jh = tokio::task::spawn(async move {
286            let result = retrieve_file_term_block(client, file_hash, prefetch_block_range).await;
287
288            // See if we're done with the file.
289            if let Ok(Some((ref returned_range, transfer_bytes, ref file_terms))) = result {
290                // See if the returned range is less than the requested range; if so, then
291                // we know we've reached the end of the file.
292                debug_assert_eq!(returned_range.start, prefetch_block_range.start);
293
294                if returned_range.end < prefetch_block_range.end {
295                    known_final_byte_position.store(returned_range.end, Ordering::Relaxed);
296                }
297
298                let new_bytes = returned_range.end.saturating_sub(returned_range.start);
299                Ok(Some((file_terms.clone(), new_bytes, transfer_bytes)))
300            } else if let Ok(None) = result {
301                // If the returned block is None, then we're beyond the end of the file; update the known final byte
302                // position to the start of the prefetch block if it hasn't been set yet (which it might
303                // have been in a separate block).
304                known_final_byte_position.fetch_min(prefetch_block_range.start, Ordering::Relaxed);
305                Ok(None)
306            } else {
307                result.map(|r| {
308                    r.map(|(returned_range, transfer_bytes, file_terms)| {
309                        let new_bytes = returned_range.end.saturating_sub(returned_range.start);
310                        (file_terms, new_bytes, transfer_bytes)
311                    })
312                })
313            }
314        });
315
316        self.prefetch_queue.push_back(jh);
317
318        Ok(())
319    }
320}