use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use hf_hub::api::tokio::ApiRepo;
use tokio::sync::Semaphore;
use tokio::task::JoinSet;
use crate::checksum;
use crate::chunked;
use crate::config::{file_matches, FetchConfig, ProgressCallback};
use crate::error::{FetchError, FileFailure};
use crate::progress;
use crate::repo::{self, RepoFile};
use crate::retry::{self, RetryPolicy};
const DEFAULT_TIMEOUT_PER_FILE: Duration = Duration::from_secs(300);
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum DownloadOutcome<T> {
Cached(T),
Downloaded(T),
}
impl<T> DownloadOutcome<T> {
#[must_use]
pub fn into_inner(self) -> T {
match self {
Self::Cached(v) | Self::Downloaded(v) => v,
}
}
#[must_use]
pub fn is_cached(&self) -> bool {
matches!(self, Self::Cached(_))
}
#[must_use]
pub fn inner(&self) -> &T {
match self {
Self::Cached(v) | Self::Downloaded(v) => v,
}
}
}
#[derive(Clone)]
struct DownloadSettings {
retry_policy: RetryPolicy,
timeout_per_file: Duration,
timeout_total: Option<Duration>,
concurrency: usize,
connections_per_file: usize,
chunk_threshold: u64,
verify_checksums: bool,
}
impl DownloadSettings {
fn from_config(config: Option<&FetchConfig>) -> Self {
Self {
retry_policy: RetryPolicy {
max_retries: config.map_or(3, |c| c.max_retries),
..RetryPolicy::default()
},
timeout_per_file: config
.and_then(|c| c.timeout_per_file)
.unwrap_or(DEFAULT_TIMEOUT_PER_FILE),
timeout_total: config.and_then(|c| c.timeout_total),
concurrency: config.map_or(4, |c| c.concurrency).max(1),
connections_per_file: config.map_or(8, |c| c.connections_per_file),
chunk_threshold: config.map_or(u64::MAX, |c| c.chunk_threshold),
verify_checksums: config.is_some_and(|c| c.verify_checksums),
}
}
}
pub async fn download_all_files(
repo: ApiRepo,
repo_id: String,
config: Option<&FetchConfig>,
) -> Result<DownloadOutcome<PathBuf>, FetchError> {
let repo_id_for_error = repo_id.clone();
let outcome = download_all_files_map(repo, repo_id, config).await?;
let was_cached = outcome.is_cached();
let file_map = outcome.into_inner();
let (filename, path) =
file_map
.into_iter()
.next()
.ok_or_else(|| FetchError::NoFilesMatched {
repo_id: repo_id_for_error,
})?;
let root = snapshot_root(&filename, &path);
if was_cached {
Ok(DownloadOutcome::Cached(root))
} else {
Ok(DownloadOutcome::Downloaded(root))
}
}
pub async fn download_all_files_map(
repo: ApiRepo,
repo_id: String,
config: Option<&FetchConfig>,
) -> Result<DownloadOutcome<HashMap<String, PathBuf>>, FetchError> {
let overall_start = tokio::time::Instant::now();
if let Some(file_map) = try_resolve_repo_from_cache(config, repo_id.as_str())? {
return Ok(DownloadOutcome::Cached(file_map));
}
tracing::debug!(repo_id = %repo_id, "listing repository files");
let include = config.and_then(|c| c.include.as_ref());
let exclude = config.and_then(|c| c.exclude.as_ref());
let all_files = repo::list_repo_files(&repo, repo_id.clone()).await?;
let files: Vec<_> = all_files
.into_iter()
.filter(|f| file_matches(f.filename.as_str(), include, exclude))
.collect();
let mut settings = DownloadSettings::from_config(config);
let on_progress = config.and_then(|c| c.on_progress.clone());
let metadata_map = fetch_metadata_if_needed(
config,
repo_id.as_str(),
settings.verify_checksums,
settings.chunk_threshold,
)
.await;
let (http_client, chunked_client, cache_dir, repo_folder, revision, token) =
build_shared_state(config, repo_id.as_str(), &settings)?;
merge_plan_recommended(
&mut settings,
config,
&files,
&metadata_map,
&cache_dir,
&repo_folder,
&revision,
);
let total = files.len();
tracing::debug!(
total_files = total,
concurrency = settings.concurrency,
"download settings (after plan optimization)"
);
check_disk_space(
&cache_dir,
&files,
&metadata_map,
repo_folder.as_str(),
revision.as_str(),
);
let repo = Arc::new(repo);
let metadata_map = Arc::new(metadata_map);
let semaphore = Arc::new(Semaphore::new(settings.concurrency));
let completed = Arc::new(AtomicUsize::new(0));
let mut join_set = JoinSet::new();
for file in files {
if let Some(total_limit) = settings.timeout_total {
if overall_start.elapsed() >= total_limit {
join_set.abort_all();
return Err(FetchError::Timeout {
filename: file.filename,
seconds: total_limit.as_secs(),
});
}
}
let permit = Arc::clone(&semaphore)
.acquire_owned()
.await
.map_err(|e| FetchError::Http(e.to_string()))?;
let task_repo = Arc::clone(&repo);
let task_meta = Arc::clone(&metadata_map);
let task_chunked_client = chunked_client.clone();
let task_http_client = Arc::clone(&http_client);
let task_cache_dir = cache_dir.clone();
let task_repo_folder = Arc::clone(&repo_folder);
let task_revision = Arc::clone(&revision);
let task_repo_id = repo_id.clone();
let task_token = Arc::clone(&token);
let task_settings = settings.clone();
let task_on_progress = on_progress.clone();
let task_completed = Arc::clone(&completed);
join_set.spawn(async move {
let result = dispatch_download(
&task_repo,
&file,
&task_meta,
task_chunked_client.as_deref(),
&task_http_client,
&task_cache_dir,
&task_repo_folder,
&task_revision,
task_repo_id.as_str(),
(*task_token).clone(),
&task_settings,
task_on_progress,
total.saturating_sub(task_completed.load(Ordering::Relaxed) + 1),
)
.await;
drop(permit);
(file, result)
});
}
let (file_map, failures) = collect_results(
&mut join_set,
settings.timeout_total,
overall_start,
on_progress.as_ref(),
total,
&completed,
)
.await?;
let file_map = validate_download_results(file_map, failures, repo_id.as_str())?;
tracing::debug!(files_downloaded = file_map.len(), "download complete");
Ok(DownloadOutcome::Downloaded(file_map))
}
async fn download_single_file(
repo: &ApiRepo,
file: &RepoFile,
metadata_map: &HashMap<String, RepoFile>,
verify_checksums: bool,
retry_policy: &RetryPolicy,
timeout: Duration,
) -> Result<PathBuf, FetchError> {
let filename = file.filename.clone();
let path = retry::retry_async(retry_policy, retry::is_retryable, || {
let fname = filename.clone();
let timeout_dur = timeout;
async move {
let download_fut = repo.get(fname.as_str());
tokio::time::timeout(timeout_dur, download_fut)
.await
.map_err(|_elapsed| FetchError::Timeout {
filename: fname.clone(),
seconds: timeout_dur.as_secs(),
})?
.map_err(FetchError::Api)
}
})
.await?;
if verify_checksums {
if let Some(meta) = metadata_map.get(file.filename.as_str()) {
if let Some(ref expected_sha) = meta.sha256 {
checksum::verify_sha256(&path, file.filename.as_str(), expected_sha.as_str())
.await?;
}
}
}
Ok(path)
}
#[allow(clippy::too_many_arguments)]
async fn download_single_file_chunked(
client: &reqwest::Client,
file: &RepoFile,
cache_dir: &std::path::Path,
repo_folder: &str,
revision: &str,
repo_id: &str,
token: Option<String>,
metadata_map: &HashMap<String, RepoFile>,
verify_checksums: bool,
retry_policy: &RetryPolicy,
connections: usize,
on_progress: Option<ProgressCallback>,
files_remaining: usize,
) -> Result<PathBuf, FetchError> {
let url = chunked::build_download_url(repo_id, revision, file.filename.as_str());
let range_info = chunked::probe_range_support(client.clone(), url, token).await?;
let Some(range_info) = range_info else {
return Err(FetchError::ChunkedDownload {
filename: file.filename.clone(),
reason: String::from("server does not support Range requests"),
});
};
let path = chunked::download_chunked(
client.clone(),
range_info,
cache_dir.to_path_buf(),
repo_folder.to_owned(),
revision.to_owned(),
file.filename.clone(),
connections,
retry_policy.clone(),
on_progress,
files_remaining,
)
.await?;
if verify_checksums {
if let Some(meta) = metadata_map.get(file.filename.as_str()) {
if let Some(ref expected_sha) = meta.sha256 {
checksum::verify_sha256(&path, file.filename.as_str(), expected_sha.as_str())
.await?;
}
}
}
Ok(path)
}
pub(crate) async fn download_file_by_name(
repo: ApiRepo,
repo_id: String,
filename: &str,
config: &FetchConfig,
) -> Result<DownloadOutcome<PathBuf>, FetchError> {
let cache_dir = config
.output_dir
.clone()
.map_or_else(crate::cache::hf_cache_dir, Ok)?;
let repo_folder = chunked::repo_folder_name(repo_id.as_str());
let revision_str = config.revision.as_deref().unwrap_or("main");
if let Some(cached) =
resolve_cached_file(&cache_dir, repo_folder.as_str(), revision_str, filename)
{
return Ok(DownloadOutcome::Cached(cached));
}
let settings = DownloadSettings::from_config(Some(config));
let on_progress = config.on_progress.clone();
let metadata_map = fetch_metadata_if_needed(
Some(config),
repo_id.as_str(),
settings.verify_checksums,
settings.chunk_threshold,
)
.await;
if let Some(size) = metadata_map.get(filename).and_then(|m| m.size) {
let single_file = RepoFile {
filename: filename.to_owned(),
size: Some(size),
sha256: None,
};
check_disk_space(
&cache_dir,
&[single_file],
&metadata_map,
repo_folder.as_str(),
revision_str,
);
}
let file_meta = metadata_map.get(filename);
let file = RepoFile {
filename: filename.to_owned(),
size: file_meta.and_then(|m| m.size),
sha256: file_meta.and_then(|m| m.sha256.clone()),
};
let http_client = chunked::build_client(config.token.as_deref())?;
let revision = revision_str.to_owned();
let chunked_client = if settings.chunk_threshold < u64::MAX {
Some(&http_client)
} else {
None
};
let result = dispatch_download(
&repo,
&file,
&metadata_map,
chunked_client,
&http_client,
&cache_dir,
repo_folder.as_str(),
revision.as_str(),
repo_id.as_str(),
config.token.clone(),
&settings,
on_progress.clone(),
0, )
.await;
let path = result?;
if let Some(ref cb) = on_progress {
let file_size = tokio::fs::metadata(&path)
.await
.map(|m| m.len())
.unwrap_or(0);
let event = progress::completed_event(filename, file_size, 0);
cb(&event);
}
Ok(DownloadOutcome::Downloaded(path))
}
#[allow(clippy::type_complexity)]
fn build_shared_state(
config: Option<&FetchConfig>,
repo_id: &str,
settings: &DownloadSettings,
) -> Result<
(
Arc<reqwest::Client>,
Option<Arc<reqwest::Client>>,
Arc<PathBuf>,
Arc<String>,
Arc<String>,
Arc<Option<String>>,
),
FetchError,
> {
let token_ref = config.and_then(|c| c.token.as_deref());
let http_client = Arc::new(chunked::build_client(token_ref)?);
let chunked_client = if settings.chunk_threshold < u64::MAX {
Some(Arc::clone(&http_client))
} else {
None
};
let cache_dir = Arc::new(
config
.and_then(|c| c.output_dir.clone())
.map_or_else(crate::cache::hf_cache_dir, Ok)?,
);
let repo_folder = Arc::new(chunked::repo_folder_name(repo_id));
let revision = Arc::new(
config
.and_then(|c| c.revision.clone())
.unwrap_or_else(|| String::from("main")),
);
let token = Arc::new(config.and_then(|c| c.token.clone()));
Ok((
http_client,
chunked_client,
cache_dir,
repo_folder,
revision,
token,
))
}
#[allow(clippy::too_many_arguments)]
async fn dispatch_download(
repo: &ApiRepo,
file: &RepoFile,
metadata_map: &HashMap<String, RepoFile>,
chunked_client: Option<&reqwest::Client>,
http_client: &reqwest::Client,
cache_dir: &std::path::Path,
repo_folder: &str,
revision: &str,
repo_id: &str,
token: Option<String>,
settings: &DownloadSettings,
on_progress: Option<ProgressCallback>,
files_remaining: usize,
) -> Result<PathBuf, FetchError> {
if let Some(cached) =
resolve_cached_file(cache_dir, repo_folder, revision, file.filename.as_str())
{
return Ok(cached);
}
let file_size = metadata_map
.get(file.filename.as_str())
.and_then(|m| m.size);
let start = std::time::Instant::now();
let result = if let (Some(size), Some(client)) = (file_size, chunked_client) {
if size >= settings.chunk_threshold {
tracing::debug!(
filename = %file.filename,
size_mib = size / 1_048_576,
connections = settings.connections_per_file,
"chunked download (multi-connection)"
);
download_single_file_chunked(
client,
file,
cache_dir,
repo_folder,
revision,
repo_id,
token,
metadata_map,
settings.verify_checksums,
&settings.retry_policy,
settings.connections_per_file,
on_progress,
files_remaining,
)
.await
} else {
tracing::debug!(
filename = %file.filename,
size_mib = size / 1_048_576,
"single-connection download (below chunk threshold)"
);
download_single_file(
repo,
file,
metadata_map,
settings.verify_checksums,
&settings.retry_policy,
settings.timeout_per_file,
)
.await
}
} else {
let reason = if file_size.is_none() {
"file size unknown (metadata missing)"
} else {
"chunked downloads disabled"
};
tracing::debug!(
filename = %file.filename,
file_size = ?file_size,
reason = reason,
"single-connection download"
);
download_single_file(
repo,
file,
metadata_map,
settings.verify_checksums,
&settings.retry_policy,
settings.timeout_per_file,
)
.await
};
let result = if is_range_not_satisfiable(&result) {
chunked::download_direct(
http_client,
repo_id,
revision,
file.filename.as_str(),
cache_dir,
)
.await
} else {
result
};
log_download_result(file.filename.as_str(), &result, file_size, start.elapsed());
result
}
async fn collect_results(
join_set: &mut JoinSet<(RepoFile, Result<PathBuf, FetchError>)>,
timeout_total: Option<Duration>,
overall_start: tokio::time::Instant,
on_progress: Option<&ProgressCallback>,
total: usize,
completed: &Arc<AtomicUsize>,
) -> Result<(HashMap<String, PathBuf>, Vec<FileFailure>), FetchError> {
let mut file_map: HashMap<String, PathBuf> = HashMap::with_capacity(total);
let mut failures: Vec<FileFailure> = Vec::new();
while let Some(join_result) = join_set.join_next().await {
if let Some(total_limit) = timeout_total {
if overall_start.elapsed() >= total_limit {
join_set.abort_all();
return Err(FetchError::Timeout {
filename: String::from("(overall timeout exceeded)"),
seconds: total_limit.as_secs(),
});
}
}
let (file, download_result) =
join_result.map_err(|e| FetchError::Http(format!("download task failed: {e}")))?;
let completed_count = completed.fetch_add(1, Ordering::Relaxed) + 1;
match download_result {
Ok(path) => {
let remaining = total.saturating_sub(completed_count);
let file_size = tokio::fs::metadata(&path)
.await
.map(|m| m.len())
.unwrap_or(0);
let event = progress::completed_event(file.filename.as_str(), file_size, remaining);
if let Some(cb) = on_progress {
cb(&event);
}
file_map.insert(file.filename, path);
}
Err(e) => {
failures.push(FileFailure {
filename: file.filename,
reason: e.to_string(),
retryable: retry::is_retryable(&e),
});
}
}
}
Ok((file_map, failures))
}
fn validate_download_results(
file_map: HashMap<String, PathBuf>,
failures: Vec<FileFailure>,
repo_id: &str,
) -> Result<HashMap<String, PathBuf>, FetchError> {
if !failures.is_empty() {
let path = file_map
.iter()
.next()
.map(|(filename, path)| snapshot_root(filename, path));
return Err(FetchError::PartialDownload { path, failures });
}
if file_map.is_empty() {
return Err(FetchError::NoFilesMatched {
repo_id: repo_id.to_owned(),
});
}
Ok(file_map)
}
async fn fetch_metadata_if_needed(
config: Option<&FetchConfig>,
repo_id: &str,
verify_checksums: bool,
chunk_threshold: u64,
) -> HashMap<String, RepoFile> {
let needs_metadata = verify_checksums || chunk_threshold < u64::MAX;
if !needs_metadata {
tracing::debug!("skipping metadata fetch (checksums disabled, chunk_threshold=MAX)");
return HashMap::new();
}
tracing::debug!(
"fetching extended metadata (checksums={verify_checksums}, chunk_threshold={chunk_threshold} bytes)"
);
match fetch_metadata_map(
repo_id,
config.and_then(|c| c.token.as_deref()),
config.and_then(|c| c.revision.as_deref()),
)
.await
{
Ok(map) => {
let with_size = map.values().filter(|f| f.size.is_some()).count();
tracing::debug!(
files_with_size = with_size,
total_files = map.len(),
"metadata fetch succeeded"
);
map
}
Err(e) => {
tracing::warn!(
error = %e,
"metadata fetch failed; chunked downloads disabled for this run"
);
HashMap::new()
}
}
}
fn log_download_result(
filename: &str,
result: &Result<PathBuf, FetchError>,
file_size: Option<u64>,
elapsed: std::time::Duration,
) {
match result {
Ok(_) => {
if let Some(size) = file_size {
#[allow(clippy::as_conversions, clippy::cast_precision_loss)]
let mbps = (size as f64 * 8.0) / elapsed.as_secs_f64() / 1_000_000.0;
tracing::debug!(
filename = %filename,
elapsed_secs = format_args!("{:.1}", elapsed.as_secs_f64()),
throughput_mbps = format_args!("{mbps:.1}"),
"download complete"
);
} else {
tracing::debug!(
filename = %filename,
elapsed_secs = format_args!("{:.1}", elapsed.as_secs_f64()),
"download complete (size unknown)"
);
}
}
Err(e) => {
tracing::debug!(
filename = %filename,
error = %e,
"download failed"
);
}
}
}
fn check_disk_space(
cache_dir: &std::path::Path,
files: &[RepoFile],
metadata_map: &HashMap<String, RepoFile>,
repo_folder: &str,
revision: &str,
) {
use fs2::available_space;
let mut download_bytes: u64 = 0;
for file in files {
if resolve_cached_file(cache_dir, repo_folder, revision, file.filename.as_str()).is_some() {
continue;
}
let size = metadata_map
.get(file.filename.as_str())
.and_then(|m| m.size)
.or(file.size)
.unwrap_or(0);
download_bytes = download_bytes.saturating_add(size);
}
if download_bytes == 0 {
return;
}
let available = match available_space(cache_dir) {
Ok(a) => a,
Err(e) => {
tracing::debug!(error = %e, "could not check available disk space");
return;
}
};
let current_cache = crate::cache::cache_summary().ok().map_or(0, |summaries| {
summaries
.iter()
.map(|s| s.total_size)
.fold(0u64, u64::saturating_add)
});
let projected_cache = current_cache.saturating_add(download_bytes);
let after_available = available.saturating_sub(download_bytes);
#[allow(clippy::cast_precision_loss, clippy::as_conversions)]
let fmt_gib = |v: u64| -> String {
let gib = v as f64 / (1024.0 * 1024.0 * 1024.0);
format!("{gib:.2} GiB")
};
if available < download_bytes {
eprintln!(
"warning: insufficient disk space \u{2014} download needs {}, only {} available \
(cache: {})",
fmt_gib(download_bytes),
fmt_gib(available),
fmt_gib(current_cache),
);
tracing::warn!(
download_bytes,
available,
current_cache,
"insufficient disk space"
);
} else {
eprintln!(
" Disk: cache {} \u{2192} {} after download ({} to fetch, {} available)",
fmt_gib(current_cache),
fmt_gib(projected_cache),
fmt_gib(download_bytes),
fmt_gib(after_available),
);
#[allow(clippy::cast_precision_loss, clippy::as_conversions)]
let ratio = available as f64 / download_bytes as f64;
if ratio < 1.1 {
eprintln!(
"warning: disk space is tight \u{2014} only {} will remain after download",
fmt_gib(after_available),
);
tracing::warn!(after_available, "disk space is tight after download");
}
}
}
fn resolve_cached_file(
cache_dir: &std::path::Path,
repo_folder: &str,
revision: &str,
filename: &str,
) -> Option<PathBuf> {
let repo_dir = cache_dir.join(repo_folder);
let commit_hash = crate::cache::read_ref(&repo_dir, revision)?;
let cached_path = repo_dir.join("snapshots").join(commit_hash).join(filename);
if cached_path.exists() {
tracing::debug!(
filename = %filename,
path = %cached_path.display(),
"file resolved from local cache"
);
Some(cached_path)
} else {
None
}
}
fn try_resolve_repo_from_cache(
config: Option<&FetchConfig>,
repo_id: &str,
) -> Result<Option<HashMap<String, PathBuf>>, FetchError> {
let cache_dir = config
.and_then(|c| c.output_dir.clone())
.map_or_else(crate::cache::hf_cache_dir, Ok)?;
let repo_folder = chunked::repo_folder_name(repo_id);
let revision = config.and_then(|c| c.revision.as_deref()).unwrap_or("main");
let include = config.and_then(|c| c.include.as_ref());
let exclude = config.and_then(|c| c.exclude.as_ref());
Ok(try_resolve_all_from_cache(
&cache_dir,
repo_folder.as_str(),
revision,
include,
exclude,
))
}
fn try_resolve_all_from_cache(
cache_dir: &std::path::Path,
repo_folder: &str,
revision: &str,
include: Option<&globset::GlobSet>,
exclude: Option<&globset::GlobSet>,
) -> Option<HashMap<String, PathBuf>> {
let repo_dir = cache_dir.join(repo_folder);
let commit_hash = crate::cache::read_ref(&repo_dir, revision)?;
let snapshot_dir = repo_dir.join("snapshots").join(commit_hash);
if !snapshot_dir.is_dir() {
return None;
}
let mut file_map = HashMap::new();
collect_cached_files_recursive(
&snapshot_dir,
&snapshot_dir,
include,
exclude,
&mut file_map,
);
if file_map.is_empty() {
return None;
}
tracing::debug!(
cached_files = file_map.len(),
"all files resolved from local cache (no network)"
);
Some(file_map)
}
fn collect_cached_files_recursive(
base: &std::path::Path,
dir: &std::path::Path,
include: Option<&globset::GlobSet>,
exclude: Option<&globset::GlobSet>,
file_map: &mut HashMap<String, PathBuf>,
) {
let Ok(entries) = std::fs::read_dir(dir) else {
return;
};
for entry in entries {
let Ok(entry) = entry else { continue };
let path = entry.path();
if path.is_dir() {
collect_cached_files_recursive(base, &path, include, exclude, file_map);
} else {
let Ok(relative) = path.strip_prefix(base) else {
continue;
};
let filename = relative.to_string_lossy().replace('\\', "/");
if file_matches(filename.as_str(), include, exclude) {
file_map.insert(filename, path);
}
}
}
}
fn snapshot_root(filename: &str, path: &std::path::Path) -> PathBuf {
let depth = std::path::Path::new(filename).components().count();
let mut root = path.to_path_buf();
for _ in 0..depth {
if !root.pop() {
break;
}
}
root
}
fn is_range_not_satisfiable(result: &Result<PathBuf, FetchError>) -> bool {
match result {
Err(e) => {
let msg = e.to_string();
msg.contains("416") || msg.contains("Range Not Satisfiable")
}
Ok(_) => false,
}
}
#[allow(clippy::too_many_arguments)]
fn merge_plan_recommended(
settings: &mut DownloadSettings,
config: Option<&FetchConfig>,
files: &[RepoFile],
metadata_map: &HashMap<String, RepoFile>,
cache_dir: &std::path::Path,
repo_folder: &str,
revision: &str,
) {
let Some(cfg) = config else {
return;
};
let plan_files: Vec<crate::plan::FilePlan> = files
.iter()
.map(|f| {
let size = metadata_map
.get(f.filename.as_str())
.and_then(|m| m.size)
.unwrap_or(0);
let cached = resolve_cached_file(
cache_dir,
repo_folder,
revision,
f.filename.as_str(),
)
.is_some();
crate::plan::FilePlan {
filename: f.filename.clone(),
size,
cached,
}
})
.collect();
let total_bytes: u64 = plan_files.iter().map(|f| f.size).sum();
let cached_bytes: u64 = plan_files.iter().filter(|f| f.cached).map(|f| f.size).sum();
let plan = crate::plan::DownloadPlan {
repo_id: String::new(), revision: String::new(),
files: plan_files,
total_bytes,
cached_bytes,
download_bytes: total_bytes.saturating_sub(cached_bytes),
};
if let Ok(rec) = plan.recommended_config() {
if !cfg.explicit.concurrency {
settings.concurrency = rec.concurrency();
}
if !cfg.explicit.connections_per_file {
settings.connections_per_file = rec.connections_per_file();
}
if !cfg.explicit.chunk_threshold {
settings.chunk_threshold = rec.chunk_threshold();
}
tracing::debug!(
concurrency = settings.concurrency,
connections_per_file = settings.connections_per_file,
chunk_threshold = settings.chunk_threshold,
"merged plan-recommended settings"
);
}
}
async fn fetch_metadata_map(
repo_id: &str,
token: Option<&str>,
revision: Option<&str>,
) -> Result<HashMap<String, RepoFile>, FetchError> {
let files = repo::list_repo_files_with_metadata(repo_id, token, revision).await?;
let map = files.into_iter().map(|f| (f.filename.clone(), f)).collect();
Ok(map)
}