use std::{
cmp::Ordering,
collections::HashMap,
convert::TryFrom,
path::{Path, PathBuf},
time::{Duration, Instant},
};
use super::download::download_file;
use super::{CacheStrategy, TARGET};
use crate::{
CfgErr, NetErr,
app::{
MIRRORS_TTL_DAYS,
constants::ZIG_COMMUNITY_MIRRORS,
utils::{ProgressHandle, verify_checksum, zv_agent},
},
};
use chrono::{DateTime, Utc};
use color_eyre::eyre::Result;
use futures::{StreamExt, stream};
use reqwest::{Client, StatusCode};
use semver::Version;
use serde::{Deserialize, Serialize};
use url::Url;
#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, PartialEq)]
pub enum Layout {
Flat,
#[default]
Versioned,
}
impl std::ops::Not for Layout {
type Output = Self;
fn not(self) -> Self::Output {
match self {
Layout::Flat => Layout::Versioned,
Layout::Versioned => Layout::Flat,
}
}
}
impl From<&str> for Layout {
fn from(s: &str) -> Self {
match s {
"flat" => Layout::Flat,
"versioned" => Layout::Versioned,
_ => Layout::default(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Mirror {
pub base_url: Url,
pub layout: Layout,
pub rank: u8,
}
impl Mirror {
pub async fn download(
&self,
client: &reqwest::Client,
semver_version: &semver::Version,
zig_tarball: &str,
tarball_path: &Path,
minisig_path: &Path,
expected_shasum: Option<&str>,
expected_size: Option<u64>,
progress_handle: &ProgressHandle,
) -> Result<Layout, NetErr> {
const TARGET: &str = "zv::network::mirror::download";
tracing::debug!(target: TARGET, "Starting download with mirror: {} (rank: {})", self.base_url, self.rank);
match self
.try_download_with_layout(
client,
semver_version,
zig_tarball,
tarball_path,
minisig_path,
expected_shasum,
expected_size,
progress_handle,
false,
)
.await
{
Ok(layout) => Ok(layout),
Err(net_err) => {
if matches!(net_err, NetErr::HTTP(status) if status.as_u16() == 404) {
tracing::info!(target: TARGET,
"Initial layout failed with HTTP 404. Trying alternate layout for mirror {}...",
self.base_url);
return self
.try_download_with_layout(
client,
semver_version,
zig_tarball,
tarball_path,
minisig_path,
expected_shasum,
expected_size,
progress_handle,
true,
)
.await;
}
Err(net_err)
}
}
}
async fn try_download_with_layout(
&self,
client: &reqwest::Client,
semver_version: &semver::Version,
zig_tarball: &str,
tarball_path: &Path,
minisig_path: &Path,
expected_shasum: Option<&str>,
expected_size: Option<u64>,
progress_handle: &ProgressHandle,
use_alternate_layout: bool,
) -> Result<Layout, NetErr> {
const TARGET: &str = "zv::network::mirror::try_download_with_layout";
let mirror_for_download = if use_alternate_layout {
let mut alternate = self.clone();
alternate.layout = !alternate.layout;
alternate
} else {
self.clone()
};
let tarball_url = mirror_for_download.get_download_url(semver_version, zig_tarball);
let minisig_filename = format!("{}.minisig", zig_tarball);
let minisig_url = mirror_for_download.get_download_url(semver_version, &minisig_filename);
tracing::trace!(target: TARGET, "Download URLs configured:");
tracing::trace!(target: TARGET, " Tarball: {}", tarball_url);
tracing::trace!(target: TARGET, " Minisig: {}", minisig_url);
if let Some(size) = expected_size {
tracing::trace!(target: TARGET, " Expected size: {} bytes ({:.1} MB)", size, size as f64 / 1_048_576.0);
} else {
tracing::trace!(target: TARGET, " Expected size: unknown");
}
if let Some(shasum) = expected_shasum {
tracing::trace!(target: TARGET, " Expected checksum: {}", shasum);
} else {
tracing::trace!(target: TARGET, " Expected checksum: unknown");
}
let progress_msg = format!(
"Downloading {} from {}",
zig_tarball, mirror_for_download.base_url
);
match progress_handle.start(&progress_msg).await {
Ok(()) => {}
Err(e) => {
tracing::debug!(target: TARGET, "Failed to start progress reporting: {} - continuing without progress updates", e);
}
};
match download_file(
client,
&tarball_url,
tarball_path,
expected_size.unwrap_or(0),
progress_handle,
)
.await
{
Ok(()) => {
tracing::debug!(target: TARGET, "Proceeding to checksum verification...");
}
Err(net_err) => {
tracing::trace!(target: TARGET, "Tarball download failed from mirror {}: {}", mirror_for_download.base_url, net_err);
match net_err {
crate::NetErr::HTTP(status) => {
tracing::trace!(target: TARGET, "HTTP error {} during tarball download - mirror may be experiencing issues", status);
}
crate::NetErr::Timeout(_) => {
tracing::trace!(target: TARGET, "Timeout during tarball download - network or mirror performance issues");
}
_ => {
tracing::trace!(target: TARGET, "Network error during tarball download: {}", net_err);
}
}
return Err(net_err);
}
}
if let Some(shasum) = expected_shasum {
tracing::debug!(target: TARGET, "Verifying tarball integrity");
match verify_checksum(tarball_path, shasum).await {
Ok(()) => {
tracing::debug!(target: TARGET, "Checksum verification successful");
}
Err(e) => {
tracing::error!(target: TARGET, "Checksum verification failed for tarball from mirror {}: {}", mirror_for_download.base_url, e);
if tarball_path.exists() {
if let Err(cleanup_err) = tokio::fs::remove_file(tarball_path).await {
tracing::warn!(target: TARGET, "Failed to remove corrupted tarball file: {}", cleanup_err);
} else {
tracing::debug!(target: TARGET, "Removed corrupted tarball file");
}
}
return Err(NetErr::Checksum(e.into()));
}
}
} else {
tracing::debug!(target: TARGET, "Skipping checksum verification - no expected checksum provided");
}
tracing::debug!(target: TARGET, "Downloading signature file from {}", minisig_url);
match progress_handle
.update("Downloading signature file...")
.await
{
Ok(()) => {
tracing::debug!(target: TARGET, "Progress updated for minisig download");
}
Err(e) => {
tracing::warn!(target: TARGET, "Failed to update progress for minisig download: {} - continuing", e);
}
}
match download_file(client, &minisig_url, minisig_path, 0, progress_handle).await {
Ok(()) => {
tracing::debug!(target: TARGET, "Minisig download completed successfully");
}
Err(net_err) => {
tracing::error!(target: TARGET, "Minisig download failed from mirror {}: {}", mirror_for_download.base_url, net_err);
match net_err {
NetErr::HTTP(status) => {
tracing::error!(target: TARGET, "HTTP error {} during minisig download - signature file may not exist on this mirror", status);
}
NetErr::Timeout(_) => {
tracing::error!(target: TARGET, "Timeout during minisig download - network or mirror performance issues");
}
_ => {
tracing::error!(target: TARGET, "Network error during minisig download: {}", net_err);
}
}
if tarball_path.exists() {
if let Err(cleanup_err) = tokio::fs::remove_file(tarball_path).await {
tracing::trace!(target: TARGET, "Failed to remove tarball after minisig failure: {}", cleanup_err);
} else {
tracing::trace!(target: TARGET, "Cleaned up tarball after minisig download failure");
}
}
return Err(net_err);
}
}
let tarball_size = match tokio::fs::metadata(tarball_path).await {
Ok(metadata) => {
let size = metadata.len();
tracing::debug!(target: TARGET, "Final tarball size: {} bytes ({:.1} MB)", size, size as f64 / 1_048_576.0);
if let Some(expected) = expected_size {
if size != expected {
tracing::warn!(target: TARGET, "Tarball size {} doesn't match expected size {} - this may indicate an issue", size, expected);
}
} else {
tracing::debug!(target: TARGET, "No expected size provided for verification");
}
size
}
Err(e) => {
tracing::error!(target: TARGET, "Failed to verify final tarball file: {}", e);
return Err(NetErr::FileIo(e));
}
};
let minisig_size = match tokio::fs::metadata(minisig_path).await {
Ok(metadata) => {
let size = metadata.len();
tracing::debug!(target: TARGET, "Final minisig size: {} bytes", size);
if size == 0 {
tracing::warn!(target: TARGET, "Minisig file is empty - this may indicate a download issue");
} else if size > 1024 {
tracing::warn!(target: TARGET, "Minisig file is unusually large ({} bytes) - this may indicate an error page was downloaded", size);
}
size
}
Err(e) => {
tracing::error!(target: TARGET, "Failed to verify final minisig file: {}", e);
return Err(NetErr::FileIo(e));
}
};
tracing::debug!(target: TARGET, "Download attempt completed successfully with mirror {} - tarball: {:.1} MB, minisig: {} bytes",
self.base_url, tarball_size as f64 / 1_048_576.0, minisig_size);
Ok(mirror_for_download.layout)
}
pub fn get_download_url(&self, version: &Version, tarball: &str) -> String {
match self.layout {
Layout::Flat => format!(
"{}/{tarball}?source={}",
self.base_url.to_string().trim_end_matches('/'),
zv_agent()
),
Layout::Versioned => format!(
"{}/{}/{}?source={}",
self.base_url.to_string().trim_end_matches('/'),
version,
tarball,
zv_agent()
),
}
}
pub fn get_alternate_url(&self, version: &Version, tarball: &str) -> String {
let alternate = Mirror {
base_url: self.base_url.clone(),
layout: !self.layout,
rank: self.rank,
};
alternate.get_download_url(version, tarball)
}
pub fn promote(&mut self) {
if self.rank > 1 {
self.rank -= 1;
}
}
pub fn demote(&mut self) {
self.rank = self.rank.saturating_add(1);
}
}
impl TryFrom<&str> for Mirror {
type Error = url::ParseError;
fn try_from(input: &str) -> Result<Self, Self::Error> {
let url_str = if input.starts_with("http://") || input.starts_with("https://") {
input.to_string()
} else {
format!("https://{input}")
};
let base_url = Url::parse(&url_str)?;
match base_url.scheme() {
"http" | "https" => {}
_ => return Err(url::ParseError::RelativeUrlWithoutBase),
}
let layout = match base_url.as_str() {
u if u.contains("zig.florent.dev") => Layout::Flat,
u if u.contains("zig.squirl.dev") => Layout::Flat,
u if u.contains("zigmirror.meox.dev") => Layout::Flat,
u if u.contains("zig-mirror.tsimnet.eu") => Layout::Flat,
u if u.contains("pkg.earth") => Layout::Flat,
u if u.contains("ziglang.freetls.fastly.net") => Layout::Flat,
u if u.contains("zig.tilok.dev") => Layout::Flat,
_ => Layout::Versioned,
};
Ok(Mirror {
layout,
base_url,
rank: 1,
})
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MirrorsIndex {
pub mirrors: Vec<Mirror>,
pub last_synced: DateTime<Utc>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RankApplyPolicy {
Overwrite,
Blend,
}
#[derive(Debug, Clone, Serialize)]
pub struct MirrorBenchmarkResult {
pub base_url: Url,
pub old_rank: u8,
pub old_layout: Layout,
#[serde(skip_serializing_if = "Option::is_none")]
pub measured_layout: Option<Layout>,
#[serde(skip_serializing_if = "Option::is_none")]
pub bytes_read: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub elapsed_ms: Option<u128>,
#[serde(skip_serializing_if = "Option::is_none")]
pub bytes_per_second: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
}
impl MirrorBenchmarkResult {
pub fn is_success(&self) -> bool {
self.bytes_per_second.is_some()
}
}
#[derive(Debug)]
enum BenchmarkProbeError {
Http(StatusCode),
Network(reqwest::Error),
EmptyBody,
}
impl std::fmt::Display for BenchmarkProbeError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Http(status) => write!(f, "HTTP {}", status),
Self::Network(err) if err.is_timeout() => write!(f, "request timed out"),
Self::Network(err) => write!(f, "{err}"),
Self::EmptyBody => write!(f, "response body was empty"),
}
}
}
impl MirrorsIndex {
pub fn new(mirrors: Vec<Mirror>) -> Self {
Self {
mirrors,
last_synced: Utc::now(),
}
}
pub fn is_expired(&self) -> bool {
self.last_synced + chrono::Duration::days(*MIRRORS_TTL_DAYS) < Utc::now()
}
pub async fn load_from_disk(path: impl AsRef<Path>) -> Result<Self, CfgErr> {
let content = tokio::fs::read_to_string(path.as_ref())
.await
.map_err(|io_err| CfgErr::NotFound(io_err.into()))?;
toml::from_str::<Self>(&content).map_err(|e| CfgErr::ParseFail(e.into()))
}
#[allow(unused)]
pub async fn load_from_disk_expire_checked(path: impl AsRef<Path>) -> Result<Self, CfgErr> {
let index = Self::load_from_disk(path.as_ref()).await?;
if index.is_expired() {
return Err(CfgErr::CacheExpired(
path.as_ref().to_string_lossy().to_string(),
));
}
Ok(index)
}
pub async fn save(&self, path: impl AsRef<Path>) -> Result<(), CfgErr> {
let content = toml::to_string_pretty(self).map_err(CfgErr::SerializeFail)?;
tokio::fs::write(path, content)
.await
.map_err(|io_err| CfgErr::WriteFail(io_err.into(), String::from("mirrors index")))?;
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct MirrorManager {
client: Client,
mirrors: Vec<Mirror>,
mirrors_index: Option<MirrorsIndex>,
cache_path: PathBuf,
}
impl MirrorManager {
pub fn new(cache_path: impl AsRef<Path>) -> Result<Self> {
Ok(Self {
client: super::create_client()?,
mirrors: Vec::with_capacity(7), mirrors_index: None,
cache_path: cache_path.as_ref().to_path_buf(),
})
}
pub async fn init_and_load(
cache_path: impl AsRef<Path>,
cache_strategy: CacheStrategy,
) -> Result<Self, NetErr> {
let mut manager = Self::new(cache_path)?;
manager.load_mirrors(cache_strategy).await?;
Ok(manager)
}
pub async fn load_mirrors(&mut self, cache_strategy: CacheStrategy) -> Result<(), NetErr> {
match cache_strategy {
CacheStrategy::AlwaysRefresh => {
self.refresh_from_network().await?;
}
CacheStrategy::PreferCache => {
if self.try_load_index_from_cache().await.is_err() {
tracing::warn!(target: TARGET, "Failed to load cached mirrors, fetching from network");
self.refresh_from_network().await?;
}
}
CacheStrategy::OnlyCache => {
if self.try_load_index_from_cache().await.is_err() {
tracing::warn!(target: TARGET, "mirrors cache not found. OnlyCache strategy... returning EmptyMirrors");
return Err(NetErr::EmptyMirrors);
}
}
CacheStrategy::RespectTtl => match self.try_load_index_from_cache().await {
Ok(()) => {
if self.is_cache_expired() {
tracing::debug!(target: TARGET, "Mirrors cache expired, refreshing");
self.refresh_from_network().await?;
} else {
tracing::debug!(target: TARGET, "Using cached mirrors");
self.apply_cached_mirrors_index();
}
}
Err(_) => {
tracing::debug!(target: TARGET, "No valid cache, fetching from network");
self.refresh_from_network().await?;
}
},
}
Ok(())
}
async fn try_load_index_from_cache(&mut self) -> Result<(), NetErr> {
let index = MirrorsIndex::load_from_disk(&self.cache_path)
.await
.map_err(|err| {
tracing::debug!(target: TARGET, "Failed to load mirrors cache from disk: {err}");
NetErr::EmptyMirrors
})?;
self.mirrors_index = Some(index);
Ok(())
}
fn apply_cached_mirrors_index(&mut self) {
if let Some(ref index) = self.mirrors_index {
self.mirrors = index.mirrors.clone();
}
}
async fn refresh_from_network(&mut self) -> Result<(), NetErr> {
let fresh_mirrors = self.fetch_network_mirrors().await?;
let merged_mirrors = match MirrorsIndex::load_from_disk(&self.cache_path).await {
Ok(cached_index) => {
let cached_mirrors_map: std::collections::HashMap<String, Mirror> = cached_index
.mirrors
.into_iter()
.map(|m| (m.base_url.to_string(), m))
.collect();
let merged: Vec<Mirror> = fresh_mirrors
.into_iter()
.map(|mut fresh_mirror| {
if let Some(cached_mirror) =
cached_mirrors_map.get(fresh_mirror.base_url.as_str())
{
fresh_mirror.layout = cached_mirror.layout;
fresh_mirror.rank = cached_mirror.rank;
}
fresh_mirror
})
.collect();
tracing::debug!(target: TARGET, "Merged layouts and ranks from {} cached mirrors into {} fresh mirrors",
cached_mirrors_map.len(), merged.len());
merged
}
Err(_) => {
tracing::debug!(target: TARGET, "No cached mirrors found, using fresh mirrors from network");
fresh_mirrors
}
};
self.mirrors = merged_mirrors;
let index = MirrorsIndex::new(self.mirrors.clone());
if let Err(e) = index.save(&self.cache_path).await {
tracing::error!(target: TARGET, "Failed to save mirrors cache: {}", e);
}
self.mirrors_index = Some(index);
Ok(())
}
async fn fetch_network_mirrors(&self) -> Result<Vec<Mirror>, NetErr> {
tracing::debug!(target: TARGET, "Fetching mirrors from {}", ZIG_COMMUNITY_MIRRORS);
let mirrors: Vec<Mirror> = self
.client
.get(ZIG_COMMUNITY_MIRRORS)
.send()
.await
.map_err(NetErr::Reqwest)?
.text()
.await
.map_err(NetErr::Reqwest)?
.lines()
.filter(|line| !line.trim().is_empty()) .filter_map(|line| {
Mirror::try_from(line.trim())
.inspect_err(|&e| {
tracing::warn!(target: TARGET, "Failed to parse mirror '{}': {}", line, e);
})
.ok()
})
.collect();
if mirrors.is_empty() {
tracing::error!(target: TARGET, "No valid mirrors found in response");
return Err(NetErr::EmptyMirrors);
}
tracing::debug!(target: TARGET, "Successfully fetched {} mirrors", mirrors.len());
Ok(mirrors)
}
async fn ensure_mirrors_loaded(&mut self) -> Result<(), NetErr> {
if self.mirrors_index.is_none() {
match MirrorsIndex::load_from_disk(&self.cache_path).await {
Ok(index) => {
self.mirrors_index = Some(index);
}
Err(_) => {
self.refresh_from_network().await?;
}
}
}
if self.mirrors.is_empty() {
self.apply_cached_mirrors_index();
}
Ok(())
}
#[inline]
fn is_cache_expired(&self) -> bool {
match &self.mirrors_index {
Some(index) => index.is_expired(),
None => true, }
}
pub async fn all_mirrors_mut(&mut self) -> Result<&mut [Mirror], NetErr> {
if self.mirrors.is_empty() {
self.ensure_mirrors_loaded().await?;
}
Ok(&mut self.mirrors)
}
pub async fn get_random_mirror(&mut self) -> Result<&mut Mirror, NetErr> {
use rand::Rng;
let mirrors = self.all_mirrors_mut().await?;
if mirrors.is_empty() {
return Err(NetErr::EmptyMirrors);
}
if mirrors.len() == 1 {
return Ok(&mut mirrors[0]);
}
let weights: Vec<f64> = mirrors
.iter()
.map(|m| 1.0f64 / m.rank as f64) .collect();
let mut rng = rand::rng();
let total_weight: f64 = weights.iter().sum();
let mut random_weight = rng.random::<f64>() * total_weight;
for (i, &weight) in weights.iter().enumerate() {
random_weight -= weight;
if random_weight <= 0.0 {
return Ok(&mut mirrors[i]);
}
}
Ok(&mut mirrors[0])
}
pub async fn sort_by_rank(&mut self) -> Result<&mut Vec<Mirror>, NetErr> {
let mirrors = self.all_mirrors_mut().await?;
mirrors.sort_by_key(|m| m.rank);
Ok(&mut self.mirrors)
}
pub async fn save_index_to_disk(&mut self) -> Result<(), NetErr> {
if self.mirrors.is_empty() {
tracing::debug!(target: TARGET, "No mirrors loaded, cannot save index to disk");
Err(NetErr::EmptyMirrors)?;
}
let index = MirrorsIndex::new(self.mirrors.clone());
index.save(&self.cache_path).await.map_err(|cfg_err| {
tracing::error!(target: TARGET, "Failed to save mirrors index to disk: {}", cfg_err);
NetErr::Other(cfg_err.into())
})?;
self.mirrors_index = Some(index);
tracing::debug!(target: TARGET, "Successfully saved mirrors index to {}", self.cache_path.display());
Ok(())
}
pub async fn benchmark_mirrors(
&mut self,
semver_version: &Version,
zig_tarball: &str,
sample_size: u64,
concurrency: usize,
) -> Result<Vec<MirrorBenchmarkResult>, NetErr> {
let mirrors = self.all_mirrors_mut().await?.to_vec();
if mirrors.is_empty() {
return Err(NetErr::EmptyMirrors);
}
let sample_size = sample_size.max(1);
let concurrency = concurrency.max(1);
let client = self.client.clone();
let semver_version = semver_version.clone();
let zig_tarball = zig_tarball.to_string();
let mut results = stream::iter(mirrors.into_iter().map(|mirror| {
let client = client.clone();
let semver_version = semver_version.clone();
let zig_tarball = zig_tarball.clone();
async move {
benchmark_single_mirror(client, mirror, semver_version, zig_tarball, sample_size)
.await
}
}))
.buffer_unordered(concurrency)
.collect::<Vec<_>>()
.await;
results.sort_by(|a, b| {
benchmark_result_sort_key(a, b)
.then_with(|| a.base_url.as_str().cmp(b.base_url.as_str()))
});
Ok(results)
}
pub async fn apply_benchmark_results(
&mut self,
results: &[MirrorBenchmarkResult],
policy: RankApplyPolicy,
) -> Result<(), NetErr> {
self.all_mirrors_mut().await?;
let ordered = ordered_benchmark_results(results, policy);
let rank_by_url: HashMap<String, u8> = ordered
.iter()
.enumerate()
.map(|(idx, result)| (result.base_url.to_string(), rank_for_index(idx)))
.collect();
let layout_by_url: HashMap<String, Layout> = results
.iter()
.filter_map(|result| {
result
.measured_layout
.map(|layout| (result.base_url.to_string(), layout))
})
.collect();
for mirror in &mut self.mirrors {
if let Some(rank) = rank_by_url.get(mirror.base_url.as_str()) {
mirror.rank = *rank;
}
if let Some(layout) = layout_by_url.get(mirror.base_url.as_str()) {
mirror.layout = *layout;
}
}
self.mirrors.sort_by(|a, b| {
a.rank
.cmp(&b.rank)
.then_with(|| a.base_url.cmp(&b.base_url))
});
self.save_index_to_disk().await
}
}
async fn benchmark_single_mirror(
client: Client,
mirror: Mirror,
semver_version: Version,
zig_tarball: String,
sample_size: u64,
) -> MirrorBenchmarkResult {
let old_rank = mirror.rank;
let old_layout = mirror.layout;
match probe_mirror_layout(&client, &mirror, &semver_version, &zig_tarball, sample_size).await {
Ok(measurement) => benchmark_success(mirror.base_url, old_rank, old_layout, measurement),
Err(BenchmarkProbeError::Http(status)) if status.as_u16() == 404 => {
let mut alternate = mirror.clone();
alternate.layout = !alternate.layout;
match probe_mirror_layout(
&client,
&alternate,
&semver_version,
&zig_tarball,
sample_size,
)
.await
{
Ok(measurement) => {
benchmark_success(mirror.base_url, old_rank, old_layout, measurement)
}
Err(err) => benchmark_failure(mirror.base_url, old_rank, old_layout, err),
}
}
Err(err) => benchmark_failure(mirror.base_url, old_rank, old_layout, err),
}
}
struct BenchmarkMeasurement {
layout: Layout,
bytes_read: u64,
elapsed: Duration,
}
async fn probe_mirror_layout(
client: &Client,
mirror: &Mirror,
semver_version: &Version,
zig_tarball: &str,
sample_size: u64,
) -> Result<BenchmarkMeasurement, BenchmarkProbeError> {
let url = mirror.get_download_url(semver_version, zig_tarball);
let range_end = sample_size.saturating_sub(1);
let start = Instant::now();
let response = client
.get(url)
.header(reqwest::header::RANGE, format!("bytes=0-{range_end}"))
.timeout(Duration::from_secs(15))
.send()
.await
.map_err(BenchmarkProbeError::Network)?;
match response.status() {
StatusCode::OK | StatusCode::PARTIAL_CONTENT => {}
status => return Err(BenchmarkProbeError::Http(status)),
}
let mut stream = response.bytes_stream();
let mut bytes_read = 0u64;
while let Some(chunk) = stream.next().await {
let chunk = chunk.map_err(BenchmarkProbeError::Network)?;
let remaining = sample_size.saturating_sub(bytes_read);
if remaining == 0 {
break;
}
bytes_read += (chunk.len() as u64).min(remaining);
if bytes_read >= sample_size {
break;
}
}
if bytes_read == 0 {
return Err(BenchmarkProbeError::EmptyBody);
}
Ok(BenchmarkMeasurement {
layout: mirror.layout,
bytes_read,
elapsed: start.elapsed(),
})
}
fn benchmark_success(
base_url: Url,
old_rank: u8,
old_layout: Layout,
measurement: BenchmarkMeasurement,
) -> MirrorBenchmarkResult {
let elapsed_secs = measurement.elapsed.as_secs_f64().max(0.001);
MirrorBenchmarkResult {
base_url,
old_rank,
old_layout,
measured_layout: Some(measurement.layout),
bytes_read: Some(measurement.bytes_read),
elapsed_ms: Some(measurement.elapsed.as_millis()),
bytes_per_second: Some(measurement.bytes_read as f64 / elapsed_secs),
error: None,
}
}
fn benchmark_failure(
base_url: Url,
old_rank: u8,
old_layout: Layout,
err: BenchmarkProbeError,
) -> MirrorBenchmarkResult {
MirrorBenchmarkResult {
base_url,
old_rank,
old_layout,
measured_layout: None,
bytes_read: None,
elapsed_ms: None,
bytes_per_second: None,
error: Some(err.to_string()),
}
}
fn benchmark_result_sort_key(a: &MirrorBenchmarkResult, b: &MirrorBenchmarkResult) -> Ordering {
match (a.bytes_per_second, b.bytes_per_second) {
(Some(a_bps), Some(b_bps)) => b_bps.partial_cmp(&a_bps).unwrap_or(Ordering::Equal),
(Some(_), None) => Ordering::Less,
(None, Some(_)) => Ordering::Greater,
(None, None) => a.old_rank.cmp(&b.old_rank),
}
}
fn ordered_benchmark_results(
results: &[MirrorBenchmarkResult],
policy: RankApplyPolicy,
) -> Vec<&MirrorBenchmarkResult> {
match policy {
RankApplyPolicy::Overwrite => {
let mut ordered = results.iter().collect::<Vec<_>>();
ordered.sort_by(|a, b| {
benchmark_result_sort_key(a, b)
.then_with(|| a.base_url.as_str().cmp(b.base_url.as_str()))
});
ordered
}
RankApplyPolicy::Blend => {
let mut successes = results
.iter()
.filter(|result| result.is_success())
.collect::<Vec<_>>();
successes.sort_by(|a, b| {
benchmark_result_sort_key(a, b)
.then_with(|| a.base_url.as_str().cmp(b.base_url.as_str()))
});
let speed_rank_by_url: HashMap<String, usize> = successes
.iter()
.enumerate()
.map(|(idx, result)| (result.base_url.to_string(), idx + 1))
.collect();
let mut failures = results
.iter()
.filter(|result| !result.is_success())
.collect::<Vec<_>>();
failures.sort_by(|a, b| {
a.old_rank
.cmp(&b.old_rank)
.then_with(|| a.base_url.as_str().cmp(b.base_url.as_str()))
});
successes.sort_by(|a, b| {
let a_speed_rank = speed_rank_by_url
.get(a.base_url.as_str())
.copied()
.unwrap_or(usize::MAX);
let b_speed_rank = speed_rank_by_url
.get(b.base_url.as_str())
.copied()
.unwrap_or(usize::MAX);
let a_score = a.old_rank as f64 + a_speed_rank as f64;
let b_score = b.old_rank as f64 + b_speed_rank as f64;
a_score
.partial_cmp(&b_score)
.unwrap_or(Ordering::Equal)
.then_with(|| a.base_url.as_str().cmp(b.base_url.as_str()))
});
successes.extend(failures);
successes
}
}
}
fn rank_for_index(idx: usize) -> u8 {
u8::try_from(idx + 1).unwrap_or(u8::MAX)
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Utc;
use tempfile::tempdir;
use wiremock::{
Mock, MockServer, ResponseTemplate,
matchers::{header, method, path},
};
fn test_mirror(url: &str, rank: u8) -> Mirror {
Mirror {
base_url: Url::parse(url).unwrap(),
layout: Layout::Flat,
rank,
}
}
fn benchmark_result(
url: &str,
old_rank: u8,
bytes_per_second: Option<f64>,
) -> MirrorBenchmarkResult {
MirrorBenchmarkResult {
base_url: Url::parse(url).unwrap(),
old_rank,
old_layout: Layout::Flat,
measured_layout: bytes_per_second.map(|_| Layout::Flat),
bytes_read: bytes_per_second.map(|_| 1024),
elapsed_ms: bytes_per_second.map(|_| 10),
bytes_per_second,
error: if bytes_per_second.is_some() {
None
} else {
Some("failed".to_string())
},
}
}
#[tokio::test]
async fn overwrite_policy_sets_speed_order_and_places_failures_last() {
let dir = tempdir().unwrap();
let cache_path = dir.path().join("mirrors.toml");
let mut manager = MirrorManager::new(&cache_path).unwrap();
manager.mirrors = vec![
test_mirror("https://slow.example", 1),
test_mirror("https://fast.example", 5),
test_mirror("https://failed.example", 2),
];
manager.mirrors_index = Some(MirrorsIndex {
mirrors: manager.mirrors.clone(),
last_synced: Utc::now(),
});
let results = vec![
benchmark_result("https://slow.example/", 1, Some(10.0)),
benchmark_result("https://fast.example/", 5, Some(100.0)),
benchmark_result("https://failed.example/", 2, None),
];
manager
.apply_benchmark_results(&results, RankApplyPolicy::Overwrite)
.await
.unwrap();
let ranks = manager
.mirrors
.iter()
.map(|m| (m.base_url.as_str().to_string(), m.rank))
.collect::<HashMap<_, _>>();
assert_eq!(ranks["https://fast.example/"], 1);
assert_eq!(ranks["https://slow.example/"], 2);
assert_eq!(ranks["https://failed.example/"], 3);
assert!(cache_path.is_file());
}
#[test]
fn blend_policy_keeps_existing_rank_signal() {
let results = vec![
benchmark_result("https://current-best.example/", 1, Some(50.0)),
benchmark_result("https://fast-but-low-priority.example/", 10, Some(100.0)),
benchmark_result("https://middle.example/", 5, Some(75.0)),
];
let ordered = ordered_benchmark_results(&results, RankApplyPolicy::Blend);
let urls = ordered
.iter()
.map(|result| result.base_url.as_str())
.collect::<Vec<_>>();
assert_eq!(
urls,
vec![
"https://current-best.example/",
"https://middle.example/",
"https://fast-but-low-priority.example/"
]
);
}
#[tokio::test]
async fn benchmark_uses_range_requests_and_records_throughput() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/zig.tar.xz"))
.and(header("range", "bytes=0-9"))
.respond_with(ResponseTemplate::new(206).set_body_bytes(vec![7; 10]))
.mount(&server)
.await;
let cache_path = tempdir().unwrap().path().join("mirrors.toml");
let mut manager = MirrorManager::new(cache_path).unwrap();
manager.mirrors = vec![test_mirror(&server.uri(), 1)];
manager.mirrors_index = Some(MirrorsIndex {
mirrors: manager.mirrors.clone(),
last_synced: Utc::now(),
});
let results = manager
.benchmark_mirrors(&Version::new(0, 15, 1), "zig.tar.xz", 10, 1)
.await
.unwrap();
assert_eq!(results.len(), 1);
assert!(results[0].is_success());
assert_eq!(results[0].bytes_read, Some(10));
assert_eq!(results[0].measured_layout, Some(Layout::Flat));
}
#[tokio::test]
async fn benchmark_tries_alternate_layout_after_404() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/0.15.1/zig.tar.xz"))
.respond_with(ResponseTemplate::new(404))
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/zig.tar.xz"))
.respond_with(ResponseTemplate::new(206).set_body_bytes(vec![7; 10]))
.mount(&server)
.await;
let cache_path = tempdir().unwrap().path().join("mirrors.toml");
let mut manager = MirrorManager::new(cache_path).unwrap();
let mut mirror = test_mirror(&server.uri(), 1);
mirror.layout = Layout::Versioned;
manager.mirrors = vec![mirror];
manager.mirrors_index = Some(MirrorsIndex {
mirrors: manager.mirrors.clone(),
last_synced: Utc::now(),
});
let results = manager
.benchmark_mirrors(&Version::new(0, 15, 1), "zig.tar.xz", 10, 1)
.await
.unwrap();
assert!(results[0].is_success());
assert_eq!(results[0].old_layout, Layout::Versioned);
assert_eq!(results[0].measured_layout, Some(Layout::Flat));
assert_eq!(manager.mirrors[0].layout, Layout::Versioned);
}
}