use crate::{Error, FxHashSet, Resolver};
use aube_registry::Packument;
use aube_registry::client::RegistryClient;
use aube_util::adaptive::AdaptiveLimit;
use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::task::JoinSet;
pub(super) struct FetchScheduler {
in_flight: JoinSet<Result<(String, Packument, bool), Error>>,
in_flight_names: FxHashSet<String>,
primer_seeded_names: FxHashSet<String>,
sem: Arc<AdaptiveLimit>,
client: Arc<RegistryClient>,
cache_dir: Option<PathBuf>,
full_cache_dir: Option<PathBuf>,
mra_exclude: HashSet<String>,
force_metadata_primer: bool,
needs_time: bool,
}
pub(super) type FetchOutcome =
Option<Result<Result<(String, Packument, bool), Error>, tokio::task::JoinError>>;
impl FetchScheduler {
pub(super) fn new(resolver: &Resolver, sem: Arc<AdaptiveLimit>, needs_time: bool) -> Self {
Self {
in_flight: JoinSet::new(),
in_flight_names: FxHashSet::default(),
primer_seeded_names: FxHashSet::default(),
sem,
client: resolver.client.clone(),
cache_dir: resolver.packument_cache_dir.clone(),
full_cache_dir: resolver.packument_full_cache_dir.clone(),
mra_exclude: resolver
.minimum_release_age
.as_ref()
.map(|m| m.exclude.clone())
.unwrap_or_default(),
force_metadata_primer: resolver.force_metadata_primer,
needs_time,
}
}
pub(super) fn in_flight_count(&self) -> usize {
self.in_flight.len()
}
pub(super) fn ensure_fetch(&mut self, name: &str, published_by: Option<&str>) {
if self.in_flight_names.contains(name) {
return;
}
self.in_flight_names.insert(name.to_string());
let primer_covers_cutoff = self.mra_exclude.contains(name)
|| published_by.is_none_or(crate::primer::covers_cutoff);
self.in_flight.spawn(fetch_one_packument(FetchInputs {
name: name.to_string(),
client: self.client.clone(),
cache_dir: self.cache_dir.clone(),
full_cache_dir: self.full_cache_dir.clone(),
primer_covers_cutoff,
force_metadata_primer: self.force_metadata_primer,
sem: self.sem.clone(),
needs_time: self.needs_time,
}));
}
pub(super) async fn join_next(&mut self) -> FetchOutcome {
self.in_flight.join_next().await
}
pub(super) fn release_in_flight(&mut self, name: &str) {
self.in_flight_names.remove(name);
}
pub(super) fn note_primer_seeded(&mut self, name: String) {
self.primer_seeded_names.insert(name);
}
pub(super) fn take_primer_seeded(&mut self, name: &str) -> bool {
self.primer_seeded_names.remove(name)
}
pub(super) async fn drain(&mut self) {
while self.in_flight.join_next().await.is_some() {}
}
}
struct FetchInputs {
name: String,
client: Arc<RegistryClient>,
cache_dir: Option<PathBuf>,
full_cache_dir: Option<PathBuf>,
primer_covers_cutoff: bool,
force_metadata_primer: bool,
sem: Arc<AdaptiveLimit>,
needs_time: bool,
}
async fn fetch_one_packument(inputs: FetchInputs) -> Result<(String, Packument, bool), Error> {
let FetchInputs {
name,
client,
cache_dir,
full_cache_dir,
primer_covers_cutoff,
force_metadata_primer,
sem,
needs_time,
} = inputs;
let _diag_span =
aube_util::diag::Span::new(aube_util::diag::Category::Resolver, "packument_fetch")
.with_meta_fn(|| format!(r#"{{"name":{}}}"#, aube_util::diag::jstr(&name)));
let _diag_inflight = aube_util::diag::inflight(aube_util::diag::Slot::Pack);
let permit_wait = std::time::Instant::now();
let permit = sem.acquire().await;
let permit_wait_ms = permit_wait.elapsed();
if permit_wait_ms.as_millis() > 1 {
aube_util::diag::event_lazy(
aube_util::diag::Category::Resolver,
"packument_permit_wait",
permit_wait_ms,
|| format!(r#"{{"name":{}}}"#, aube_util::diag::jstr(&name)),
);
}
aube_util::diag::attribute_wait(aube_util::diag::Slot::Pack, &name, permit_wait_ms);
let _holder_guard = aube_util::diag::register_holder(aube_util::diag::Slot::Pack, &name);
let mut cached = if needs_time {
match full_cache_dir.as_ref() {
Some(dir) => client.cached_full_packument_lookup(&name, dir),
None => Default::default(),
}
} else if let Some(ref dir) = cache_dir {
client.cached_packument_lookup(&name, dir)
} else {
Default::default()
};
if let Some(packument) = cached.packument.take() {
aube_util::diag::instant_lazy(
aube_util::diag::Category::Resolver,
"packument_disk_hit",
|| format!(r#"{{"name":{}}}"#, aube_util::diag::jstr(&name)),
);
permit.record_cancelled();
return Ok((name, packument, false));
}
let use_metadata_primer = (force_metadata_primer
|| client.uses_default_npm_registry_for(&name))
&& primer_covers_cutoff;
if use_metadata_primer
&& !cached.stale
&& let Some(seed) = crate::primer::get(&name)
{
let mut packument = seed.packument();
if force_metadata_primer {
for version in packument.versions.values_mut() {
let tarball = client.tarball_url(&version.name, &version.version);
version.dist = version.dist.take().map(|mut dist| {
dist.tarball = tarball;
dist
});
}
}
if needs_time {
if let Some(dir) = full_cache_dir.as_ref() {
client.seed_full_packument_cache(
&name,
dir,
&packument,
seed.etag.as_deref(),
seed.last_modified.as_deref(),
false,
);
}
} else if let Some(dir) = cache_dir.as_ref() {
client.seed_packument_cache(
&name,
dir,
&packument,
seed.etag.as_deref(),
seed.last_modified.as_deref(),
false,
);
}
aube_util::diag::instant_lazy(
aube_util::diag::Category::Resolver,
"packument_primer_hit",
|| format!(r#"{{"name":{}}}"#, aube_util::diag::jstr(&name)),
);
permit.record_cancelled();
return Ok((name, packument, true));
}
let fetch_outcome = if needs_time {
match full_cache_dir.as_ref() {
Some(dir) => {
client
.fetch_packument_with_time_cached_after_lookup(&name, dir, cached)
.await
}
None => client.fetch_packument(&name).await,
}
} else if let Some(ref dir) = cache_dir {
client
.fetch_packument_cached_after_lookup(&name, dir, cached)
.await
} else {
client.fetch_packument(&name).await
};
let packument = match fetch_outcome {
Ok(p) => {
permit.record_success();
p
}
Err(e) => {
if e.is_throttle() {
permit.record_throttle();
} else {
permit.record_cancelled();
}
return Err(Error::Registry(name.clone(), e.to_string()));
}
};
aube_util::diag::instant_lazy(
aube_util::diag::Category::Resolver,
"packument_network_hit",
|| format!(r#"{{"name":{}}}"#, aube_util::diag::jstr(&name)),
);
Ok((name, packument, false))
}