xet_data/file_reconstruction/reconstruction_terms/
manager.rs1use std::collections::VecDeque;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU64, Ordering};
4
5use more_asserts::*;
6use tokio::task::JoinHandle;
7use tokio::time::Instant;
8use tracing::{debug, info};
9use xet_client::cas_client::Client;
10use xet_client::cas_types::FileRange;
11use xet_core_structures::ExpWeightedMovingAvg;
12use xet_core_structures::merklehash::MerkleHash;
13use xet_runtime::config::ReconstructionConfig;
14
15use super::super::FileReconstructionError;
16use super::super::error::Result;
17use super::file_term::{FileTerm, retrieve_file_term_block};
18use crate::progress_tracking::ItemProgressUpdater;
19
20type RawFetchedFileTerms = Result<Option<(Vec<FileTerm>, u64, u64)>>;
21
22pub struct ReconstructionTermManager {
26 config: Arc<ReconstructionConfig>,
27 client: Arc<dyn Client>,
28 file_hash: MerkleHash,
29 requested_byte_range: FileRange,
30 last_block_info: Option<(Instant, FileRange)>,
31 known_final_byte_position: Arc<AtomicU64>,
32 prefetched_byte_position: u64,
33 current_active_byte_position: u64,
34 prefetch_queue: VecDeque<JoinHandle<RawFetchedFileTerms>>,
35 completion_rate_estimator: ExpWeightedMovingAvg,
36 progress_updater: Option<Arc<ItemProgressUpdater>>,
37 total_bytes_reported: u64,
38 total_transfer_bytes_reported: u64,
39}
40
41impl ReconstructionTermManager {
42 pub async fn new(
43 config: Arc<ReconstructionConfig>,
44 client: Arc<dyn Client>,
45 file_hash: MerkleHash,
46 file_byte_range: FileRange,
47 progress_updater: Option<Arc<ItemProgressUpdater>>,
48 ) -> Result<Self> {
49 let completion_rate_estimator =
50 ExpWeightedMovingAvg::new_count_decay(config.completion_rate_estimator_half_life);
51
52 let requested_byte_range = file_byte_range;
53
54 let mut s = Self {
55 config,
56 client,
57 file_hash,
58 requested_byte_range,
59 last_block_info: None,
60 prefetched_byte_position: requested_byte_range.start,
61 current_active_byte_position: requested_byte_range.start,
62 prefetch_queue: VecDeque::new(),
63 known_final_byte_position: Arc::new(AtomicU64::new(requested_byte_range.end)),
64 completion_rate_estimator,
65 progress_updater,
66 total_bytes_reported: 0,
67 total_transfer_bytes_reported: 0,
68 };
69
70 let initial_fetch_size = s.config.min_reconstruction_fetch_size.as_u64();
75 s.prefetch_block(initial_fetch_size).await?;
76 s.prefetch_block(2 * initial_fetch_size).await?;
77
78 debug!(
79 %file_hash,
80 prefetch_queue_size = s.prefetch_queue.len(),
81 "Initial prefetch blocks queued"
82 );
83
84 Ok(s)
85 }
86
87 pub async fn next_file_terms(&mut self) -> Result<Option<Vec<FileTerm>>> {
90 if let Some((block_start_time, block_range)) = self.last_block_info.take() {
92 let completion_time = Instant::now().duration_since(block_start_time).as_secs_f64();
93 let block_size = block_range.end - block_range.start;
94
95 if block_size != 0 {
96 self.completion_rate_estimator
97 .update((block_size as f64) / completion_time.max(1e-6));
98 }
99
100 info!(
101 file_hash = %self.file_hash,
102 block_start = block_range.start,
103 block_end = block_range.end,
104 block_size = block_size,
105 completion_time = completion_time,
106 "Updated completion rate estimate based on previous block completion time (seconds)."
107 );
108 }
109
110 self.check_prefetch_buffer().await?;
112
113 let Some(next_block_jh) = self.prefetch_queue.pop_front() else {
114 debug_assert_ge!(self.prefetched_byte_position, self.known_final_byte_position.load(Ordering::Relaxed));
118 return Ok(None);
119 };
120
121 let maybe_next_block = next_block_jh
122 .await
123 .map_err(|e| FileReconstructionError::InternalError(format!("Join error: {e}")))??;
124
125 if let Some((file_terms, new_bytes, new_transfer_bytes)) = maybe_next_block {
126 let domain = file_terms
128 .first()
129 .and_then(|t| t.url_info.xorb_block_retrieval_urls.try_read().ok())
130 .and_then(|urls| {
131 urls.1
132 .first()
133 .and_then(|(url, _)| url::Url::parse(url).ok())
134 .and_then(|u| u.host_str().map(str::to_owned))
135 });
136
137 let block_start = file_terms.first().map(|t| t.byte_range.start).unwrap_or(0);
139 let block_end = file_terms.last().map(|t| t.byte_range.end).unwrap_or(0);
140
141 self.last_block_info = Some((Instant::now(), FileRange::new(block_start, block_end)));
143
144 self.current_active_byte_position = block_end;
146
147 info!(
148 file_hash = %self.file_hash,
149 domain = domain.as_deref().unwrap_or("unknown"),
150 block_start = block_start,
151 block_end = block_end,
152 block_size = file_terms.len(),
153 "Received block of file terms from prefetch queue"
154 );
155
156 if let Some(progress_updater) = &self.progress_updater {
157 self.total_bytes_reported = self.total_bytes_reported.saturating_add(new_bytes);
158 self.total_transfer_bytes_reported =
159 self.total_transfer_bytes_reported.saturating_add(new_transfer_bytes);
160 progress_updater.update_item_size(self.total_bytes_reported, false);
161 progress_updater.update_transfer_size(self.total_transfer_bytes_reported);
162 }
163
164 Ok(Some(file_terms))
165 } else {
166 self.known_final_byte_position
168 .store(self.prefetched_byte_position, Ordering::Relaxed);
169
170 if let Some(progress_updater) = &self.progress_updater {
171 progress_updater.update_item_size(self.total_bytes_reported, true);
172 }
173
174 info!(
175 file_hash = %self.file_hash,
176 prefetched_byte_position = self.prefetched_byte_position,
177 "Completed prefetch queue; end of file reached."
178 );
179
180 Ok(None)
181 }
182 }
183
184 fn is_done_fetching(&self) -> bool {
185 self.prefetched_byte_position >= self.known_final_byte_position.load(Ordering::Relaxed)
186 }
187
188 async fn check_prefetch_buffer(&mut self) -> Result<()> {
190 if self.is_done_fetching() {
192 return Ok(());
193 }
194
195 let target_completion_time = self.config.target_block_completion_time.as_secs_f64();
197
198 let completion_rate = self.completion_rate_estimator.value();
201
202 let prefetch_buffer_target_size = target_completion_time * completion_rate;
205
206 let min_prefetch_buffer_size = self.config.min_prefetch_buffer.as_u64() as f64;
208 let prefetch_buffer_size = prefetch_buffer_target_size.max(min_prefetch_buffer_size);
209
210 let current_prefetch_buffer_size = self.prefetched_byte_position - self.current_active_byte_position;
212
213 if !self.prefetch_queue.is_empty() && prefetch_buffer_size <= current_prefetch_buffer_size as f64 {
216 return Ok(());
217 }
218
219 let next_prefetch_target_block_size = (prefetch_buffer_size - current_prefetch_buffer_size as f64) as u64;
221
222 let min_fetch_size = self.config.min_reconstruction_fetch_size.as_u64();
223 let max_fetch_size = self.config.max_reconstruction_fetch_size.as_u64().max(min_fetch_size);
224 let next_prefetch_block_size = next_prefetch_target_block_size.clamp(min_fetch_size, max_fetch_size);
225
226 self.prefetch_block(next_prefetch_block_size).await
228 }
229
230 async fn prefetch_block(&mut self, block_size: u64) -> Result<()> {
231 let block_size = block_size.clamp(
232 self.config.min_reconstruction_fetch_size.as_u64(),
233 self.config.max_reconstruction_fetch_size.as_u64(),
234 );
235
236 let mut prefetch_block_range =
238 FileRange::new(self.prefetched_byte_position, self.prefetched_byte_position + block_size);
239
240 let last_byte_position = self
242 .known_final_byte_position
243 .load(Ordering::Relaxed)
244 .min(self.requested_byte_range.end);
245
246 if prefetch_block_range.end > last_byte_position {
248 prefetch_block_range.end = last_byte_position;
249 }
250
251 let min_fetch_size = self.config.min_reconstruction_fetch_size.as_u64();
253 if prefetch_block_range.end + min_fetch_size > self.requested_byte_range.end {
254 prefetch_block_range.end = self.requested_byte_range.end;
255 }
256
257 if prefetch_block_range.start >= prefetch_block_range.end {
260 debug!(
261 file_hash = %self.file_hash,
262 "Prefetch block skipped - already at or past end of requested range"
263 );
264 return Ok(());
265 }
266
267 let actual_block_size = prefetch_block_range.end - prefetch_block_range.start;
268 info!(
269 file_hash = %self.file_hash,
270 prefetch_range = ?(prefetch_block_range.start, prefetch_block_range.end),
271 requested_block_size = block_size,
272 actual_block_size,
273 queue_depth = self.prefetch_queue.len() + 1,
274 "Scheduling prefetch block"
275 );
276
277 self.prefetched_byte_position = prefetch_block_range.end;
279
280 let known_final_byte_position = self.known_final_byte_position.clone();
282 let client = self.client.clone();
283 let file_hash = self.file_hash;
284
285 let jh = tokio::task::spawn(async move {
286 let result = retrieve_file_term_block(client, file_hash, prefetch_block_range).await;
287
288 if let Ok(Some((ref returned_range, transfer_bytes, ref file_terms))) = result {
290 debug_assert_eq!(returned_range.start, prefetch_block_range.start);
293
294 if returned_range.end < prefetch_block_range.end {
295 known_final_byte_position.store(returned_range.end, Ordering::Relaxed);
296 }
297
298 let new_bytes = returned_range.end.saturating_sub(returned_range.start);
299 Ok(Some((file_terms.clone(), new_bytes, transfer_bytes)))
300 } else if let Ok(None) = result {
301 known_final_byte_position.fetch_min(prefetch_block_range.start, Ordering::Relaxed);
305 Ok(None)
306 } else {
307 result.map(|r| {
308 r.map(|(returned_range, transfer_bytes, file_terms)| {
309 let new_bytes = returned_range.end.saturating_sub(returned_range.start);
310 (file_terms, new_bytes, transfer_bytes)
311 })
312 })
313 }
314 });
315
316 self.prefetch_queue.push_back(jh);
317
318 Ok(())
319 }
320}