use super::*;
impl ConnectionWorker {
pub(super) async fn run_dry(self) -> Result<()> {
let mut current_range: Option<Rc<ActiveRange>> = None;
let mut prefetched_range: Option<Rc<ActiveRange>> = None;
let mut prefetch_handle: Option<JoinHandle<Result<Option<Rc<ActiveRange>>>>> = None;
let mut no_more_work_hint = false;
let mut range_started_at = Instant::now();
let mut range_start_cursor = 0_u64;
let mut local_cursor = 0_u64;
loop {
if self.control.is_halted() || self.should_exit_for_scale_down() {
if let Some(range) = current_range.take() {
self.relinquish_range(&range, local_cursor).await;
}
if let Some(range) = prefetched_range.take() {
self.relinquish_range(&range, range.cursor.get()).await;
}
if let Some(handle) = prefetch_handle.take() {
handle.abort();
}
self.set_worker_state(WorkerState::Paused, Some("halt requested".to_string()));
self.clear_worker_range();
break;
}
if self
.collect_prefetch_result(&mut prefetch_handle, &mut prefetched_range, false)
.await?
{
no_more_work_hint = true;
}
if current_range.is_none() {
self.set_worker_state(WorkerState::WaitingForWork, None);
self.clear_worker_range();
if let Some(range) = prefetched_range.take() {
SchedulerMetrics::add(&self.metrics.prefetch_hits, 1);
current_range = Some(range.clone());
range_started_at = Instant::now();
local_cursor = range.cursor.get();
range_start_cursor = local_cursor;
self.set_worker_state(WorkerState::Downloading, Some("prefetched".to_string()));
self.set_worker_range(&range, local_cursor);
} else {
current_range = self.request_work(false).await?;
if let Some(range) = ¤t_range {
range_started_at = Instant::now();
local_cursor = range.cursor.get();
range_start_cursor = local_cursor;
self.set_worker_state(WorkerState::Downloading, None);
self.set_worker_range(range, local_cursor);
} else {
self.set_worker_state(
WorkerState::Finished,
Some("no more work".to_string()),
);
break;
}
}
}
let range = current_range.as_ref().unwrap().clone();
let end = range.end.get();
if local_cursor >= end {
range.status.set(RANGE_STATUS_FINISHED);
self.set_worker_state(WorkerState::Finished, Some("range complete".to_string()));
self.clear_worker_range();
current_range = None;
continue;
}
let step = DRY_RUN_STEP_BYTES.min(end - local_cursor);
let new_pos = local_cursor + step;
range.cursor.set(new_pos);
self.update_range_speed_sample(&range, new_pos);
self.global_downloaded
.set(self.global_downloaded.get().saturating_add(step));
self.index_state.mark_completed_span(local_cursor, new_pos);
local_cursor = new_pos;
self.worker_control.transferred_bytes.set(
self.worker_control
.transferred_bytes
.get()
.saturating_add(step),
);
let recent_speed_bps =
estimate_speed_bps(range_started_at, range_start_cursor, new_pos);
self.worker_control
.diagnostics
.set_speed_bps(recent_speed_bps);
self.set_worker_state(WorkerState::Downloading, None);
self.set_worker_range(&range, local_cursor);
let remaining = end.saturating_sub(new_pos);
let prefetch_trigger_bytes = compute_prefetch_trigger_bytes(
remaining,
recent_speed_bps,
self.effective_prefetch_limit_bytes(),
self.scaler.last_protocol.get(),
self.ewma_connection_rtt_ms.get(),
);
SchedulerMetrics::update_max(
&self.metrics.max_prefetch_trigger_bytes,
prefetch_trigger_bytes,
);
if should_prefetch(
remaining,
recent_speed_bps,
self.effective_prefetch_limit_bytes(),
self.scaler.last_protocol.get(),
self.ewma_connection_rtt_ms.get(),
) && prefetch_handle.is_none()
&& prefetched_range.is_none()
&& !no_more_work_hint
{
SchedulerMetrics::add(&self.metrics.prefetch_requests, 1);
prefetch_handle = Some(self.spawn_prefetch_request());
}
tokio::time::sleep(Duration::from_millis(DRY_RUN_STEP_DELAY_MS)).await;
}
if let Some(handle) = prefetch_handle {
handle.abort();
}
if !self.control.is_halted() && !self.should_exit_for_scale_down() {
self.set_worker_state(WorkerState::Finished, Some("worker complete".to_string()));
self.clear_worker_range();
}
Ok(())
}
pub(super) fn spawn_prefetch_request(&self) -> JoinHandle<Result<Option<Rc<ActiveRange>>>> {
let coordinator_tx = self.coordinator_tx.clone();
let connection_id = self.connection_id;
let metrics = self.metrics.clone();
tokio::task::spawn_local(async move {
request_work_inner(coordinator_tx, connection_id, metrics).await
})
}
pub(super) async fn collect_prefetch_result(
&self,
handle: &mut Option<JoinHandle<Result<Option<Rc<ActiveRange>>>>>,
prefetched: &mut Option<Rc<ActiveRange>>,
wait_even_if_not_finished: bool,
) -> Result<bool> {
let should_take = handle
.as_ref()
.map(|task| wait_even_if_not_finished || task.is_finished())
.unwrap_or(false);
if should_take {
let result = handle.take().unwrap().await??;
if result.is_some() {
SchedulerMetrics::add(&self.metrics.prefetch_ready, 1);
}
let exhausted = result.is_none();
*prefetched = result;
return Ok(exhausted);
}
Ok(false)
}
pub(super) async fn request_work(&self, prefetched: bool) -> Result<Option<Rc<ActiveRange>>> {
if prefetched {
SchedulerMetrics::add(&self.metrics.prefetch_requests, 1);
}
self.log_msg(&format!("requesting_work prefetched={}", prefetched))
.await;
request_work_inner(
self.coordinator_tx.clone(),
self.connection_id,
self.metrics.clone(),
)
.await
}
}
async fn request_work_inner(
coordinator_tx: mpsc::Sender<WorkRequest>,
connection_id: u32,
metrics: Rc<SchedulerMetrics>,
) -> Result<Option<Rc<ActiveRange>>> {
let waited = Instant::now();
SchedulerMetrics::add(&metrics.work_requests, 1);
let (tx, rx) = oneshot::channel();
if coordinator_tx
.send(WorkRequest { connection_id, tx })
.await
.is_err()
{
return Ok(None);
}
let result = match rx.await {
Ok(range) => range,
Err(_) => None,
};
SchedulerMetrics::add(
&metrics.request_wait_ms,
waited.elapsed().as_millis() as u64,
);
Ok(result)
}