tur-rs 0.9.2

A relentless, high-concurrency download manager built for speed and efficiency. Tur uses dynamic work-stealing and aligned storage to saturate your bandwidth while maintaining a minuscule memory footprint. Inspired by the legends, built for the modern Rust ecosystem.
Documentation
use super::*;

impl ConnectionWorker {
    pub(super) async fn handle_range_retry(
        &self,
        range: &Rc<ActiveRange>,
        current_start: u64,
        current_end: u64,
        consecutive_failures: u32,
        reason: &str,
        retry_hint: RetryHint,
    ) -> Result<u32> {
        let next_failures = consecutive_failures.saturating_add(1);
        SchedulerMetrics::add(&self.metrics.retry_attempts, 1);

        if matches!(retry_hint, RetryHint::Abort) || next_failures > MAX_RANGE_RETRIES {
            return Err(anyhow!(
                "range#{} failed after {} retries at bytes {}..{}: {}",
                range.id,
                consecutive_failures,
                current_start,
                current_end,
                reason
            ));
        }

        let delay_ms = match retry_hint {
            RetryHint::Immediate => 0,
            RetryHint::Backoff(ms) => ms,
            RetryHint::ReduceWorkers(ms) => {
                let min_connections = self.scaler.config.borrow().min_connections.max(1);
                let active_connections = self.scaler.n_active.get();
                if active_connections > min_connections {
                    self.scaler.skip_growth_sample.set(true);
                    self.scaler
                        .slow_start_remaining
                        .set(self.scaler.slow_start_remaining.get().saturating_add(1));
                    self.worker_control.stop_requested.set(true);
                    self.relinquish_range(range, current_start).await;
                } else {
                    self.set_worker_state(
                        WorkerState::Retrying,
                        Some(format!(
                            "retry floor active={} min={}",
                            active_connections, min_connections
                        )),
                    );
                }
                ms
            }
            RetryHint::ShrinkRange(ms) => ms,
            RetryHint::Abort => 0,
        };
        SchedulerMetrics::add(&self.metrics.retry_wait_ms, delay_ms);
        self.log_msg(&format!(
            "{}; retry_hint={:?} retry {}/{} after {}ms on range#{} bytes={}..{}",
            reason,
            retry_hint,
            next_failures,
            MAX_RANGE_RETRIES,
            delay_ms,
            range.id,
            current_start,
            current_end
        ))
        .await;
        tokio::time::sleep(Duration::from_millis(delay_ms)).await;
        Ok(next_failures)
    }
}

pub(super) fn classify_http_status_retry(status: ::http::StatusCode) -> RetryHint {
    match status.as_u16() {
        429 => RetryHint::ReduceWorkers(RETRY_MAX_DELAY_MS),
        503 => RetryHint::Immediate,
        500..=599 => RetryHint::Backoff(RETRY_BASE_DELAY_MS.saturating_mul(2)),
        416 => RetryHint::Abort,
        _ => RetryHint::Backoff(RETRY_BASE_DELAY_MS),
    }
}

pub(super) fn classify_error_retry(reason: &str, made_progress: bool) -> RetryHint {
    let reason = reason.to_ascii_lowercase();
    if reason.contains("tls") || reason.contains("ssl") || reason.contains("handshake") {
        return RetryHint::ReduceWorkers(RETRY_MAX_DELAY_MS);
    }
    if reason.contains("connection reset") || reason.contains("broken pipe") {
        return if made_progress {
            RetryHint::ShrinkRange(RETRY_BASE_DELAY_MS)
        } else {
            RetryHint::ReduceWorkers(RETRY_BASE_DELAY_MS.saturating_mul(4))
        };
    }
    if reason.contains("timed out") || reason.contains("timeout") {
        return if made_progress {
            RetryHint::ShrinkRange(RETRY_BASE_DELAY_MS)
        } else {
            RetryHint::Backoff(RETRY_BASE_DELAY_MS.saturating_mul(3))
        };
    }
    RetryHint::Backoff(RETRY_BASE_DELAY_MS)
}