use anyhow::Result;
use arch_mirrors_rs::{Mirror, Protocol, Status};
use clap::{ArgAction, Args, Parser, ValueEnum, value_parser};
use clap_verbosity_flag::Verbosity;
use futures_util::StreamExt;
use jiff::{Span, Timestamp};
use regex::Regex;
use reqwest::Url;
use std::cmp::{Ordering, Reverse};
use std::collections::HashMap;
use std::ffi::OsString;
use std::fs::{self, File};
use std::io::{self, Write};
use std::path::{Path, PathBuf};
use std::process::Stdio;
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};
use tokio::sync::Semaphore;
use tokio::task::JoinSet;
use xdg::BaseDirectories;
const URL: &str = "https://archlinux.org/mirrors/status/json/";
const DEFAULT_CONNECTION_TIMEOUT: u64 = 5;
const DEFAULT_DOWNLOAD_TIMEOUT: u64 = 5;
const DEFAULT_CACHE_TIMEOUT: u64 = 300;
#[derive(Debug, ValueEnum, Clone, Copy, PartialEq)]
#[allow(
clippy::doc_markdown,
reason = "This is used to generate the user facing help."
)]
enum SortType {
Age,
Rate,
Country,
Score,
Delay,
}
#[derive(Parser, Debug)]
#[allow(
clippy::doc_markdown,
reason = "This is used to generate the user facing help."
)]
#[command(
about,
author,
version,
propagate_version = true,
next_line_help = false,
disable_help_subcommand = true
)]
struct Cli {
#[arg(long, default_value = URL)]
url: String,
#[arg(long)]
list_countries: bool,
#[clap(flatten)]
verbose: Verbosity,
#[command(flatten)]
run: RunOptions,
}
#[derive(Debug, Args)]
#[allow(
clippy::doc_markdown,
reason = "This is used to generate the user facing help."
)]
struct RunOptions {
#[arg(long, default_value_t = DEFAULT_CONNECTION_TIMEOUT, value_name = "n")]
connection_timeout: u64,
#[arg(long, default_value_t = DEFAULT_DOWNLOAD_TIMEOUT, value_name = "n")]
download_timeout: u64,
#[arg(long, default_value_t = DEFAULT_CACHE_TIMEOUT, value_name = "n")]
cache_timeout: u64,
#[arg(long, value_name = "filepath")]
save: Option<String>,
#[arg(long)]
sort: Option<SortType>,
#[arg(long, default_value_t = 0)]
threads: usize,
#[arg(long, default_value_t = false)]
info: bool,
#[command(flatten)]
filters: Filters,
}
#[derive(Parser, Debug)]
#[command(
next_help_heading = "filters\n\nThe following filters are inclusive, i.e. the returned list will only contain mirrors for which all of the given conditions are met.\n"
)]
struct Filters {
#[arg(long, short, value_name = "n")]
age: Option<f32>,
#[arg(long, value_name = "n")]
delay: Option<f32>,
#[arg(long, short, value_name = "country name or code", value_delimiter=',', action = ArgAction::Append)]
country: Vec<String>,
#[arg(long, short, value_name = "n")]
fastest: Option<usize>,
#[arg(long, short, value_name = "regex", action = ArgAction::Append)]
include: Vec<Regex>,
#[arg(long, short, value_name = "regex", action = ArgAction::Append)]
exclude: Vec<Regex>,
#[arg(long, short, value_name = "n")]
latest: Option<usize>,
#[arg(long, value_name = "n")]
score: Option<usize>,
#[arg(long, short, value_name = "n")]
number: Option<usize>,
#[arg(long, short, value_delimiter=',', value_name = "protocol", action = ArgAction::Append)]
protocol: Vec<Protocol>,
#[arg(long, value_name = "[0-100]", default_value_t = 100, value_parser = value_parser!(u8).range(0..=100))]
completion_percent: u8,
#[arg(long, default_value_t = false)]
isos: bool,
#[arg(long, default_value_t = false)]
ipv4: bool,
#[arg(long, default_value_t = false)]
ipv6: bool,
}
fn get_cache_file(name: Option<&str>) -> io::Result<PathBuf> {
let name = name.unwrap_or("mirrorstatus.json");
let base_dirs = BaseDirectories::new();
let cache_dir = base_dirs
.get_cache_home()
.unwrap_or_else(|| PathBuf::from("~/.cache"));
fs::create_dir_all(&cache_dir)?;
Ok(cache_dir.join(name))
}
async fn get_mirror_status(
http_client: &reqwest::Client,
run_options: &RunOptions,
url: &str,
cache_file_path: Option<PathBuf>,
) -> Result<(Status, SystemTime)> {
let Some(cache_file_path) = cache_file_path else {
let loaded = http_client.get(url).send().await?.json().await?;
return Ok((loaded, SystemTime::now()));
};
let mtime = cache_file_path
.metadata()
.ok()
.and_then(|meta| meta.modified().ok());
let is_valid = mtime
.and_then(|mtime| SystemTime::now().duration_since(mtime).ok())
.filter(|elapsed| elapsed.as_secs() <= run_options.cache_timeout)
.is_some();
if let Some(mtime) = mtime {
if is_valid {
let loaded = serde_json::from_reader(File::open(cache_file_path)?)?;
return Ok((loaded, mtime));
}
}
let loaded = http_client.get(url).send().await?.json().await?;
let to_write = serde_json::to_string_pretty(&loaded)?;
fs::write(cache_file_path, to_write)?;
Ok((loaded, SystemTime::now()))
}
#[derive(PartialEq, Eq, Hash)]
struct Country<'a> {
country: &'a str,
code: &'a str,
}
fn count_countries<'a>(
mirrors: impl IntoIterator<Item = &'a Mirror>,
) -> HashMap<Country<'a>, usize> {
let mut counts = HashMap::new();
for mirror in mirrors {
if mirror.country_code.is_empty() {
continue;
}
counts
.entry(Country {
country: mirror.country.as_ref(),
code: mirror.country_code.as_ref(),
})
.and_modify(|e| *e += 1)
.or_insert(1);
}
counts
}
struct Metadata<'a> {
when: Timestamp,
origin: &'a str,
retrieved: SystemTime,
}
async fn run(options: &Cli) -> anyhow::Result<()> {
let http_client = reqwest::Client::builder()
.timeout(Duration::from_secs(options.run.download_timeout))
.connect_timeout(Duration::from_secs(options.run.connection_timeout))
.build()?;
let cache_file = get_cache_file(None).ok();
let when = Timestamp::now();
let (mut status, mtime) =
get_mirror_status(&http_client, &options.run, &options.url, cache_file).await?;
if options.list_countries {
list_countries(&status);
return Ok(());
}
filter_status(&options.run.filters, &mut status);
if let Some(n) = options.run.filters.latest {
if n > 0 {
sort_status(SortType::Age, &options.run, &http_client, &mut status).await;
status.urls.truncate(n);
}
}
if let Some(n) = options.run.filters.score {
if n > 0 {
sort_status(SortType::Score, &options.run, &http_client, &mut status).await;
status.urls.truncate(n);
}
}
if let Some(n) = options.run.filters.fastest {
if n > 0 {
sort_status(SortType::Rate, &options.run, &http_client, &mut status).await;
status.urls.truncate(n);
}
} else if let Some(sort_type) = options.run.sort {
if sort_type != SortType::Rate {
sort_status(sort_type, &options.run, &http_client, &mut status).await;
}
}
if let Some(n) = options.run.filters.number {
status.urls.truncate(n);
}
let metadata = Metadata {
when,
origin: options.url.as_ref(),
retrieved: mtime,
};
match (options.run.info, options.run.save.as_ref()) {
(true, Some(path)) => {
File::create(path).and_then(move |file| print_mirror_info(&status, file))?;
}
(false, Some(path)) => {
File::create(path).and_then(move |file| format_output(&metadata, &status, file))?;
}
(true, None) => {
print_mirror_info(&status, io::stdout())?;
}
(false, None) => {
format_output(&metadata, &status, io::stdout())?;
}
}
Ok(())
}
fn print_mirror_info(status: &Status, mut out: impl Write) -> io::Result<()> {
const WIDTH: usize = 16;
fn write_optional<T: std::fmt::Display>(
out: &mut impl Write,
name: &str,
value: Option<&T>,
) -> io::Result<()> {
if let Some(value) = value.as_ref() {
writeln!(out, "{name:WIDTH$}: {value}")
} else {
writeln!(out, "{name:WIDTH$}: None")
}
}
for mirror in &status.urls {
writeln!(out, "{}$repo/os/$arch", mirror.url)?;
writeln!(out, "{0:1$}: {2}", "active", WIDTH, mirror.active)?;
write_optional(&mut out, "completion_pct", mirror.completion_pct.as_ref())?;
writeln!(out, "{0:1$}: {2}", "country", WIDTH, mirror.country)?;
writeln!(
out,
"{0:1$}: {2}",
"country_code", WIDTH, mirror.country_code
)?;
write_optional(&mut out, "delay", mirror.delay.as_ref())?;
writeln!(out, "{0:1$}: {2}", "details", WIDTH, mirror.details)?;
write_optional(
&mut out,
"duration_average",
mirror.duration_average.as_ref(),
)?;
write_optional(&mut out, "duration_stddev", mirror.duration_stddev.as_ref())?;
writeln!(out, "{0:1$}: {2}", "ipv4", WIDTH, mirror.ipv4)?;
writeln!(out, "{0:1$}: {2}", "ipv4", WIDTH, mirror.ipv6)?;
writeln!(out, "{0:1$}: {2}", "isos", WIDTH, mirror.isos)?;
write_optional(&mut out, "last_sync", mirror.last_sync.as_ref())?;
writeln!(out, "{0:1$}: {2}", "protocol", WIDTH, mirror.protocol)?;
write_optional(&mut out, "score", mirror.score.as_ref())?;
writeln!(out)?;
}
Ok(())
}
fn format_output(metadata: &Metadata, status: &Status, mut out: impl Write) -> io::Result<()> {
let command = std::env::args().collect::<Vec<_>>().join(" ");
let retrieved = Timestamp::try_from(metadata.retrieved).unwrap_or(metadata.when);
writeln!(
out,
"################################################################################\n\
################# Arch Linux mirrorlist generated by Reflector #################\n\
################################################################################\n"
)?;
writeln!(
out,
"# With: {}\n# When: {}\n# From: {}\n# Retrieved: {}\n# Last Check: {}\n",
command, metadata.when, metadata.origin, retrieved, status.last_check
)?;
for mirror in &status.urls {
writeln!(out, "Server = {}$repo/os/$arch", mirror.url)?;
}
Ok(())
}
async fn sort_status(
sort_type: SortType,
run_options: &RunOptions,
http_client: &reqwest::Client,
status: &mut Status,
) {
match sort_type {
SortType::Age => status.urls.sort_by_key(|mir| mir.last_sync),
SortType::Rate => {
let rates = rate_status(run_options, http_client, status).await;
status
.urls
.sort_by(|a, b| match (rates.get(&a.url), rates.get(&b.url)) {
(Some(rate_a), Some(rate_b)) => rate_a
.partial_cmp(rate_b)
.unwrap_or(Ordering::Equal)
.reverse(),
(Some(_), None) => Ordering::Less,
(None, Some(_)) => Ordering::Greater,
(None, None) => Ordering::Equal,
});
}
SortType::Country => status.urls.sort_by(|a, b| a.country.cmp(&b.country)),
SortType::Score => status.urls.sort_by(|a, b| {
a.score
.partial_cmp(&b.score)
.unwrap_or(Ordering::Equal)
.reverse()
}),
SortType::Delay => status.urls.sort_by_key(|mir| Reverse(mir.delay)),
}
}
#[allow(clippy::cast_precision_loss)]
async fn rate_status(
run_options: &RunOptions,
http_client: &reqwest::Client,
status: &Status,
) -> HashMap<Url, f64> {
const DB_FILENAME: &str = "extra.db";
const DB_SUBPATH: &str = "extra/os/x86_64/extra.db";
let mut task_set = JoinSet::<anyhow::Result<(Url, f64)>>::new();
let mut rates = HashMap::with_capacity(status.urls.len());
let semaphore = Arc::new(Semaphore::new(run_options.threads.max(1)));
let connection_timeout = run_options.connection_timeout;
for mirror in &status.urls {
let url = mirror.url.clone();
let semaphore = semaphore.clone();
match mirror.protocol {
Protocol::Http | Protocol::Https => {
let task_client = http_client.clone();
task_set.spawn(async move {
let _guard = semaphore.acquire().await?;
let db_url = url.join(DB_SUBPATH)?;
let start = Instant::now();
let mut content_length = 0;
let mut stream = task_client.get(db_url).send().await?.bytes_stream();
while let Some(chunk) = stream.next().await {
content_length += chunk?.len();
}
let micros = Instant::elapsed(&start).as_secs_f64();
let rate = (content_length as f64) / micros;
Ok((url, rate))
});
}
Protocol::Rsync => {
task_set.spawn(async move {
let _guard = semaphore.acquire().await?;
let temp_dir = tempfile::TempDir::new()?;
let db_url = url.join(DB_SUBPATH)?;
let start = Instant::now();
let exit_status = tokio::process::Command::new("rsync")
.arg("-avL")
.arg("--no-h")
.arg("--no-motd")
.arg(format!("--contimeout={connection_timeout}"))
.arg(db_url.as_str())
.arg(temp_dir.path())
.stdout(Stdio::null())
.stderr(Stdio::null())
.spawn()?
.wait()
.await?;
if !exit_status.success() {
return Err(anyhow::anyhow!(exit_status));
}
let micros = Instant::elapsed(&start).as_secs_f64();
let file_path = Path::join(temp_dir.path(), DB_FILENAME);
let content_length = std::fs::metadata(file_path)?.len();
let rate = (content_length as f64) / micros;
Ok((url, rate))
});
}
}
}
while let Some(result) = task_set.join_next().await {
match result {
Ok(Ok((url, rate))) => {
rates.insert(url, rate);
}
Ok(Err(err)) => eprintln!("error while rating mirror: {err}"),
Err(err) => eprintln!("error while rating mirror: {err}"),
}
}
rates
}
#[allow(clippy::cast_sign_loss)]
#[allow(clippy::cast_possible_truncation)]
fn filter_status(filters: &Filters, status: &mut Status) {
let now = Timestamp::now();
let min_completion_pct = f64::from(filters.completion_percent) / 100.0;
let max_age = filters
.age
.and_then(|age| Span::new().try_hours(age as i64).ok());
status.urls.retain(move |mirror| {
if let Some(last_sync) = mirror.last_sync {
if let Some(max_age) = max_age {
if matches!(max_age.compare(Span::new()), Ok(Ordering::Greater))
&& last_sync + max_age < now
{
return false;
}
}
} else {
return false;
}
if let Some(completion_pct) = mirror.completion_pct {
if completion_pct < min_completion_pct {
return false;
}
}
if !filters.country.is_empty() {
let country_matches = filters.country.iter().any(|c| {
let trimmed = c.trim();
if trimmed == "*" {
return true;
}
trimmed.eq_ignore_ascii_case(mirror.country.as_str())
|| trimmed.eq_ignore_ascii_case(mirror.country_code.as_str())
});
if !country_matches {
return false;
}
}
if !filters.protocol.is_empty() && !filters.protocol.contains(&mirror.protocol) {
return false;
}
if !filters.include.is_empty()
&& !filters
.include
.iter()
.any(|re| re.is_match(mirror.url.as_str()))
{
return false;
}
if !filters.exclude.is_empty()
&& filters
.exclude
.iter()
.any(|re| re.is_match(mirror.url.as_str()))
{
return false;
}
if let Some(delay) = filters.delay {
let max_delay = (delay * 3600.0) as u32;
if let Some(mirror_delay) = mirror.delay {
if mirror_delay > max_delay {
return false;
}
} else {
return false;
}
}
if filters.isos && !mirror.isos {
return false;
}
if filters.ipv4 && !mirror.ipv4 {
return false;
}
if filters.ipv6 && !mirror.ipv6 {
return false;
}
true
});
}
fn list_countries(status: &Status) {
let counts = count_countries(&status.urls);
let mut sorted = vec![];
for (country, count) in counts {
sorted.push((country, count));
}
sorted.sort_by(|c1, c2| c1.0.code.cmp(c2.0.code));
let country_width = sorted
.iter()
.map(|(c, _)| c.country.len())
.max()
.unwrap_or(0)
.max("Country".len());
let code_width = sorted
.iter()
.map(|(c, _)| c.code.len())
.max()
.unwrap_or(0)
.max("Code".len());
let count_width = sorted
.iter()
.map(|(_, c)| c.ilog(10) as usize)
.max()
.unwrap_or(0)
.max("Count".len());
println!(
"{0:1$} {2:3$} {4:5$}",
"Country", country_width, "Code", code_width, "Count", count_width
);
println!(
"{0:1$} {2:3$} {4:5$}",
"=======", country_width, "====", code_width, "=====", count_width
);
for (country, count) in sorted {
println!(
"{0:1$} {2:3$} {4:5$}",
country.country, country_width, country.code, code_width, count, count_width
);
}
}
fn convert_arg_line_to_args(content: &str, _prefix: char) -> Vec<argfile::Argument> {
content
.split('\n')
.map(str::trim)
.filter(|arg| !arg.is_empty() && !arg.starts_with('#'))
.flat_map(str::split_whitespace)
.map(OsString::from)
.map(argfile::Argument::PassThrough)
.collect()
}
fn main() {
let cli = match argfile::expand_args(convert_arg_line_to_args, argfile::PREFIX) {
Ok(args) => Cli::parse_from(args),
Err(err) => {
eprintln!("error: {err}");
return;
}
};
let maybe_runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(cli.run.threads.max(1))
.build();
let result = match maybe_runtime {
Ok(runtime) => runtime.block_on(run(&cli)),
Err(err) => {
eprintln!("error: {err}");
return;
}
};
if let Err(err) = result {
eprintln!("error: {err}");
}
}