use crate::MKTOOL_DEFAULT_THREADS;
use clap::Args;
use indicatif::{HumanBytes, HumanDuration, ProgressBar, ProgressStyle};
use pkgsrc::distinfo::Distinfo;
use rayon::prelude::*;
use reqwest::blocking::Client;
use std::env;
use std::error::Error;
use std::fs;
use std::fs::File;
use std::io::{self, BufRead, BufReader};
use std::net::ToSocketAddrs;
use std::path::PathBuf;
use std::process;
use std::sync::LazyLock;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
use suppaftp::{FtpStream, types::FileType, types::Mode};
use thiserror::Error;
use url::Url;
static FETCH_COUNTER: AtomicU64 = AtomicU64::new(0);
static CONNECT_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
Duration::from_secs(match env::var("MKTOOL_CONNECT_TIMEOUT") {
Ok(v) => match v.parse() {
Ok(n) => n,
Err(e) => {
eprintln!(
"WARNING: invalid MKTOOL_CONNECT_TIMEOUT '{v}': {e}, using default"
);
15
}
},
Err(_) => 15,
})
});
static READ_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
Duration::from_secs(match env::var("MKTOOL_READ_TIMEOUT") {
Ok(v) => match v.parse() {
Ok(n) => n,
Err(e) => {
eprintln!(
"WARNING: invalid MKTOOL_READ_TIMEOUT '{v}': {e}, using default"
);
60
}
},
Err(_) => 60,
})
});
#[derive(Args, Debug)]
pub struct Fetch {
#[arg(short, value_name = "distdir", default_value = ".")]
#[arg(help = "Directory where distfiles are stored")]
distdir: PathBuf,
#[arg(short = 'f', value_name = "distinfo")]
#[arg(help = "Path to a distinfo file containing checksums")]
distinfo: Option<PathBuf>,
#[arg(short = 'I', value_name = "input")]
#[arg(help = "Read files from input")]
input: Option<PathBuf>,
#[arg(short = 'j', value_name = "jobs")]
#[arg(help = "Maximum number of threads (or \"MKTOOL_JOBS\" env var)")]
jobs: Option<usize>,
}
#[derive(Clone, Debug)]
struct FetchFile {
filepath: PathBuf,
filename: String,
distdir: PathBuf,
sites: Vec<String>,
status: bool,
}
#[derive(Error, Debug)]
pub enum FetchError {
#[error(transparent)]
Ftp(#[from] suppaftp::FtpError),
#[error(transparent)]
Io(#[from] io::Error),
#[error("Unable to fetch file")]
NotFound,
#[error(transparent)]
Reqwest(#[from] reqwest::Error),
#[error(transparent)]
Url(#[from] url::ParseError),
}
impl Fetch {
pub fn run(&self) -> Result<i32, FetchError> {
let started = Instant::now();
let mut files: Vec<FetchFile> = vec![];
let distinfo = match &self.distinfo {
Some(s) => match fs::read(s) {
Ok(buf) => Some(Distinfo::from_bytes(&buf)),
Err(e) => {
eprintln!(
"fetch: unable to read distinfo {}: {e}",
s.display()
);
return Ok(1);
}
},
None => None,
};
if let Some(input) = &self.input {
let reader: Box<dyn io::BufRead> = match input.to_str() {
Some("-") => Box::new(io::stdin().lock()),
Some(f) => Box::new(BufReader::new(File::open(f)?)),
None => {
eprintln!(
"ERROR: File '{}' is not valid unicode.",
input.display()
);
std::process::exit(1);
}
};
for line in reader.lines() {
let line = line?;
let v: Vec<&str> = line.split_whitespace().collect();
if v.len() < 2 {
eprintln!("fetch: Invalid input: {line}");
return Ok(1);
}
let filepath = PathBuf::from(v[0]);
let distdir = PathBuf::from(v[1]);
let sites = v
.get(2..)
.unwrap_or(&[])
.iter()
.map(|s| s.to_string())
.collect::<Vec<String>>();
let filename =
match filepath.file_name().and_then(|f| f.to_str()) {
Some(f) => f.to_string(),
None => {
eprintln!(
"fetch: invalid filename: {}",
filepath.display()
);
return Ok(1);
}
};
files.push(FetchFile {
filepath,
filename,
distdir,
sites,
status: true,
});
}
}
let nthreads = match self.jobs {
Some(n) => n,
None => match env::var("MKTOOL_JOBS") {
Ok(n) => match n.parse::<usize>() {
Ok(n) => n,
Err(e) => {
eprintln!(
"WARNING: invalid MKTOOL_JOBS '{n}': {e}, using default"
);
MKTOOL_DEFAULT_THREADS
}
},
Err(_) => MKTOOL_DEFAULT_THREADS,
},
};
rayon::ThreadPoolBuilder::new()
.num_threads(nthreads)
.build_global()
.unwrap();
let style = ProgressStyle::with_template(
"{prefix:>12} [{bar:57}] {binary_bytes:>7}/{binary_total_bytes:7}",
)
.unwrap()
.progress_chars("=> ");
let progress =
ProgressBar::new(0).with_prefix("Downloading").with_style(style);
let client = build_client()?;
files.par_iter_mut().for_each(|file| {
if let Err(e) =
fetch_and_verify(&client, file, &distinfo, &progress)
{
progress.suspend(|| {
eprintln!(
"Failed to fetch {}: {e}",
file.distdir.join(&file.filepath).display(),
);
});
file.status = false;
}
});
progress.finish_and_clear();
let mut rv = 0;
for f in &files {
if !f.status {
rv = 1;
break;
}
}
if progress.length() > Some(0) && rv == 0 {
let dsize = progress.length().unwrap();
let dtime = started.elapsed();
println!(
"Downloaded {} in {} ({}/s)",
HumanBytes(dsize),
HumanDuration(dtime),
HumanBytes(dsize / dtime.as_millis() as u64 * 1000)
);
}
Ok(rv)
}
}
fn url_from_site(site: &str, filename: &str) -> String {
let mut url = String::new();
if let Some(s) = site.strip_prefix('-') {
url.push_str(s);
} else {
url.push_str(site);
if !site.ends_with('/') {
url.push('/');
}
url.push_str(filename);
};
url
}
fn fetch_ftp(
url: &Url,
filename: &PathBuf,
progress: &ProgressBar,
) -> Result<u64, FetchError> {
let host = url.host_str().ok_or(FetchError::NotFound)?;
let path = url.path();
let port = url.port().unwrap_or(21);
let addrs: Vec<_> = (host, port).to_socket_addrs()?.collect();
if addrs.is_empty() {
return Err(FetchError::NotFound);
}
let mut last_err = None;
let (stream, addr) = addrs
.into_iter()
.find_map(|addr| {
match std::net::TcpStream::connect_timeout(&addr, *CONNECT_TIMEOUT)
{
Ok(s) => Some((s, addr)),
Err(e) => {
last_err = Some(e);
None
}
}
})
.ok_or_else(|| {
suppaftp::FtpError::ConnectionError(last_err.unwrap_or_else(|| {
io::Error::new(io::ErrorKind::NotFound, "no addresses resolved")
}))
})?;
stream
.set_read_timeout(Some(*READ_TIMEOUT))
.map_err(suppaftp::FtpError::ConnectionError)?;
stream
.set_write_timeout(Some(*READ_TIMEOUT))
.map_err(suppaftp::FtpError::ConnectionError)?;
let mut ftp = FtpStream::connect_with_stream(stream)?;
if addr.is_ipv6() {
ftp.set_mode(Mode::ExtendedPassive);
}
ftp.login("anonymous", "anonymous")?;
ftp.transfer_type(FileType::Binary)?;
let mut ftpfile = ftp.retr_as_stream(path)?;
let file = File::create(filename)?;
std::io::copy(&mut ftpfile, &mut progress.wrap_write(&file))?;
ftp.finalize_retr_stream(ftpfile)?;
ftp.quit()?;
Ok(file.metadata()?.len())
}
fn fetch_and_verify(
client: &Client,
file: &FetchFile,
distinfo: &Option<Distinfo>,
progress: &ProgressBar,
) -> Result<u64, FetchError> {
let mut file_name = PathBuf::from(&file.distdir);
file_name.push(&file.filepath);
if let Some(dir) = file_name.parent() {
fs::create_dir_all(dir)?;
}
if file_name.exists() {
if let Some(di) = distinfo {
match di.verify_size(&file_name) {
Ok(s) => return Ok(s),
Err(_) => fs::remove_file(&file_name)?,
}
} else {
return Ok(file_name.metadata()?.len());
}
}
let counter = FETCH_COUNTER.fetch_add(1, Ordering::Relaxed);
let temp_name = file_name.with_extension(format!(
"{}.mktool.{}.{}",
file_name.extension().map(|s| s.to_str().unwrap_or("")).unwrap_or(""),
process::id(),
counter
));
let expected_size = if let Some(di) = distinfo {
match di.distfile(&file.filepath) {
Some(e) => e.size.unwrap_or(0),
None => 0,
}
} else {
0
};
progress.inc_length(expected_size);
if progress.is_hidden() {
println!("Fetching {}", &file.filename);
} else {
progress.println(format!("{:>12} {}", "Fetching", &file.filename));
}
if file.sites.is_empty() {
eprintln!("No fetch sites available for {}", &file.filename);
return Err(FetchError::NotFound);
}
'nextsite: for site in &file.sites {
let url = url_from_site(site, &file.filename);
let parseurl = Url::parse(&url)?;
if parseurl.scheme() == "ftp" {
match fetch_ftp(&parseurl, &temp_name, progress) {
Ok(_) => {
if let Some(di) = distinfo {
if let Some(entry) = di.distfile(&file.filepath) {
for result in entry.verify_checksums(&temp_name) {
if let Err(e) = result {
progress.suspend(|| {
eprintln!(
"Verification failed for {url}: {e}"
);
});
remove_temp(&temp_name);
continue 'nextsite;
}
}
}
}
return rename_to_final(&temp_name, &file_name);
}
Err(e) => {
progress.suspend(|| {
eprintln!("Unable to fetch {url}: {e}");
});
remove_temp(&temp_name);
continue 'nextsite;
}
}
}
match client.get(&url).send() {
Ok(mut body) => {
if expected_size == 0 {
if let Some(len) = body.content_length() {
progress.inc_length(len);
}
}
if !&body.status().is_success() {
progress.suspend(|| {
eprintln!(
"Unable to fetch {}: {}",
url,
&body.status()
);
});
continue;
}
let tempfile = File::create(&temp_name)?;
if let Err(e) =
body.copy_to(&mut progress.wrap_write(&tempfile))
{
drop(tempfile);
remove_temp(&temp_name);
progress.suspend(|| {
eprintln!("Unable to fetch {url}: {e}");
});
continue 'nextsite;
}
drop(tempfile);
if let Some(di) = distinfo {
if let Some(entry) = di.distfile(&file.filepath) {
for result in entry.verify_checksums(&temp_name) {
if let Err(e) = result {
progress.suspend(|| {
eprintln!(
"Verification failed for {url}: {e}"
);
});
remove_temp(&temp_name);
continue 'nextsite;
}
}
}
}
return rename_to_final(&temp_name, &file_name);
}
Err(e) => {
let errmsg = if let Some(reqwest) = e.source() {
if let Some(hyper) = reqwest.source() {
format!("Unable to fetch {url}: {hyper}")
} else {
format!("Unable to fetch {url}: {reqwest}")
}
} else {
format!("Unable to fetch {url}: {e}")
};
progress.suspend(|| {
eprintln!("{errmsg}");
});
}
}
}
remove_temp(&temp_name);
Err(FetchError::NotFound)
}
#[cfg(feature = "webpki-roots")]
fn build_client() -> Result<Client, reqwest::Error> {
let root_store = rustls::RootCertStore::from_iter(
webpki_roots::TLS_SERVER_ROOTS.iter().cloned(),
);
let tls_config = rustls::ClientConfig::builder()
.with_root_certificates(root_store)
.with_no_client_auth();
Client::builder()
.referer(false)
.connect_timeout(*CONNECT_TIMEOUT)
.tls_backend_preconfigured(tls_config)
.build()
}
#[cfg(not(feature = "webpki-roots"))]
fn build_client() -> Result<Client, reqwest::Error> {
Client::builder().referer(false).connect_timeout(*CONNECT_TIMEOUT).build()
}
fn remove_temp(path: &PathBuf) {
if let Err(e) = fs::remove_file(path) {
if e.kind() != io::ErrorKind::NotFound {
eprintln!("Failed to remove {}: {e}", path.display());
}
}
}
fn rename_to_final(
temp: &PathBuf,
final_path: &PathBuf,
) -> Result<u64, FetchError> {
if final_path.exists() {
if let Err(e) = fs::remove_file(temp) {
eprintln!("Failed to remove {}: {e}", temp.display());
}
return Ok(final_path.metadata()?.len());
}
fs::rename(temp, final_path)?;
Ok(final_path.metadata()?.len())
}