use super::cache::Cache;
use super::cache_lock::{CacheLockGuard, acquire_cache_lock};
use super::crate_facts::CrateFacts;
use super::crate_spec::CrateSpec;
use super::progress::Progress;
use super::request_tracker::RequestTracker;
use super::{CrateRef, CratesData, ProviderResult};
use crate::Result;
use chrono::Utc;
use core::time::Duration;
use ohno::IntoAppError;
use crate::{HashMap, HashSet};
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Arc;
pub struct Collector {
crates_provider: super::crates::Provider,
hosting_provider: super::hosting::Provider,
advisories_provider: super::advisories::Provider,
codebase_provider: super::codebase::Provider,
coverage_provider: super::coverage::Provider,
docs_provider: super::docs::Provider,
progress: Arc<dyn Progress>,
_cache_lock: CacheLockGuard,
}
impl core::fmt::Debug for Collector {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("Collector")
.field("crates_provider", &self.crates_provider)
.field("hosting_provider", &self.hosting_provider)
.field("advisories_provider", &self.advisories_provider)
.field("codebase_provider", &self.codebase_provider)
.field("coverage_provider", &self.coverage_provider)
.field("docs_provider", &self.docs_provider)
.field("progress", &"<dyn Progress>")
.finish_non_exhaustive()
}
}
impl Collector {
#[expect(clippy::too_many_arguments, reason = "all cache TTL parameters are necessary for configuration")]
pub async fn new(
github_token: Option<&str>,
codeberg_token: Option<&str>,
cache_dir: impl AsRef<Path>,
crates_cache_ttl: Duration,
hosting_cache_ttl: Duration,
codebase_cache_ttl: Duration,
coverage_cache_ttl: Duration,
advisories_cache_ttl: Duration,
ignore_cached: bool,
progress: impl Progress + 'static,
) -> Result<Self> {
let progress: Arc<dyn Progress> = Arc::new(progress);
progress.set_phase("Preparing");
let crates_cache_dir = create_cache_dir(&cache_dir, "crates")?;
let hosting_cache_dir = create_cache_dir(&cache_dir, "hosting")?;
let codebase_cache_dir = create_cache_dir(&cache_dir, "codebase")?;
let coverage_cache_dir = create_cache_dir(&cache_dir, "coverage")?;
let advisories_cache_dir = create_cache_dir(&cache_dir, "advisories")?;
let docs_cache_dir = create_cache_dir(&cache_dir, "docs")?;
let cache_lock = acquire_cache_lock(cache_dir.as_ref()).await?;
let hosting_cache = Cache::new(hosting_cache_dir, hosting_cache_ttl, ignore_cached);
let codebase_cache = Cache::new(codebase_cache_dir, codebase_cache_ttl, ignore_cached);
let coverage_cache = Cache::new(coverage_cache_dir, coverage_cache_ttl, ignore_cached);
let advisories_cache = Cache::new(advisories_cache_dir, advisories_cache_ttl, ignore_cached);
let docs_cache = Cache::new(docs_cache_dir, Duration::MAX, ignore_cached);
Ok(Self {
crates_provider: super::crates::Provider::new(&crates_cache_dir, crates_cache_ttl, Arc::clone(&progress), Utc::now(), ignore_cached, None).await?,
advisories_provider: super::advisories::Provider::new(&advisories_cache, Arc::clone(&progress))
.await?,
hosting_provider: super::hosting::Provider::new(github_token, codeberg_token, hosting_cache)?,
codebase_provider: super::codebase::Provider::new(codebase_cache),
coverage_provider: super::coverage::Provider::new(coverage_cache, None),
docs_provider: super::docs::Provider::new(docs_cache, None),
progress,
_cache_lock: cache_lock,
})
}
pub async fn collect(
&self,
crate_refs: &[CrateRef],
suggestions: bool,
) -> Result<impl Iterator<Item = CrateFacts>> {
if crate_refs.is_empty() {
return Ok(Vec::new().into_iter());
}
let crate_refs: Vec<_> = crate_refs.iter().cloned().collect::<HashSet<_>>().into_iter().collect();
self.progress.set_phase("Identifying");
let crate_data = self
.crates_provider
.get_crates_data(&crate_refs, self.progress.as_ref(), suggestions)
.await;
let crate_data: Vec<_> = crate_data
.fold(HashMap::default(), |mut map, (crate_spec, provider_result)| {
let _ = map.entry(crate_spec).or_insert(provider_result);
map
})
.into_iter()
.collect();
self.progress.set_phase("Querying");
let collected_facts = self.query_providers(crate_data).await;
self.progress.done();
Ok(collected_facts.into_iter())
}
async fn query_providers(&self, crates_data: Vec<(CrateSpec, ProviderResult<CratesData>)>) -> Vec<CrateFacts> {
let request_tracker = RequestTracker::new(&self.progress);
let mut facts_map: HashMap<CrateSpec, CrateFacts> = crates_data
.into_iter()
.map(|(crate_spec, crates_result)| {
let facts = CrateFacts {
crate_spec: crate_spec.clone(),
crates_data: crates_result,
hosting_data: ProviderResult::Unavailable("not queried".into()),
advisory_data: ProviderResult::Unavailable("not queried".into()),
codebase_data: ProviderResult::Unavailable("not queried".into()),
coverage_data: ProviderResult::Unavailable("not queried".into()),
docs_data: ProviderResult::Unavailable("not queried".into()),
};
(crate_spec, facts)
})
.collect();
let all_queryable_specs: Vec<CrateSpec> = facts_map
.iter()
.filter(|(_, facts)| facts.crates_data.is_found())
.map(|(crate_spec, _)| crate_spec.clone())
.collect();
if !all_queryable_specs.is_empty() {
let (advisory_iter, docs_iter, hosting_iter, codebase_iter, coverage_iter) = tokio::join!(
self.advisories_provider.get_advisory_data(all_queryable_specs.clone()),
self.docs_provider.get_docs_data(all_queryable_specs.clone(), &request_tracker),
self.hosting_provider
.get_hosting_data(all_queryable_specs.clone(), &request_tracker),
self.codebase_provider
.get_codebase_data(all_queryable_specs.clone(), &request_tracker),
self.coverage_provider.get_coverage_data(all_queryable_specs, &request_tracker),
);
macro_rules! update_facts {
($iter:expr, $field:ident) => {
for (crate_spec, result) in $iter {
if let Some(facts) = facts_map.get_mut(&crate_spec) {
facts.$field = result;
}
}
};
}
update_facts!(advisory_iter, advisory_data);
update_facts!(docs_iter, docs_data);
update_facts!(hosting_iter, hosting_data);
update_facts!(codebase_iter, codebase_data);
update_facts!(coverage_iter, coverage_data);
}
facts_map.into_values().collect()
}
}
fn create_cache_dir(base_path: impl AsRef<Path>, name: impl AsRef<str>) -> Result<PathBuf> {
let name_str = name.as_ref();
let cache_path = base_path.as_ref().join(name_str);
#[cfg(windows)]
let needs_creation = !cache_path.exists();
fs::create_dir_all(&cache_path).into_app_err_with(|| format!("creating `{name_str}` cache directory"))?;
#[cfg(windows)]
if needs_creation && name_str == "crates" {
disable_directory_compression(&cache_path);
}
Ok(cache_path)
}
#[cfg(windows)]
fn disable_directory_compression(path: impl AsRef<Path>) {
use windows::Win32::Foundation::{CloseHandle, HANDLE};
use windows::Win32::Storage::FileSystem::{
COMPRESSION_FORMAT_NONE, CreateFileW, FILE_FLAG_BACKUP_SEMANTICS, FILE_SHARE_READ, FILE_SHARE_WRITE, FILE_WRITE_DATA, OPEN_EXISTING,
};
use windows::Win32::System::IO::DeviceIoControl;
use windows::Win32::System::Ioctl::FSCTL_SET_COMPRESSION;
use windows::core::HSTRING;
use std::ffi::OsStr;
use std::os::windows::ffi::OsStrExt;
use std::ptr::{addr_of, addr_of_mut};
struct HandleGuard(HANDLE);
impl Drop for HandleGuard {
fn drop(&mut self) {
unsafe {
let _ = CloseHandle(self.0);
}
}
}
let path = path.as_ref();
let wide_chars: Vec<_> = OsStr::new(path).encode_wide().collect();
let path_wide = HSTRING::from_wide(&wide_chars);
let handle = unsafe {
CreateFileW(
&path_wide,
FILE_WRITE_DATA.0, FILE_SHARE_READ | FILE_SHARE_WRITE, None, OPEN_EXISTING, FILE_FLAG_BACKUP_SEMANTICS, None, )
};
let Ok(handle) = handle else {
return; };
let _guard = HandleGuard(handle);
let compression_format = COMPRESSION_FORMAT_NONE;
let mut bytes_returned: u32 = 0;
#[expect(clippy::cast_possible_truncation, reason = "size_of::<u16>() is always 2, which fits in u32")]
let _ = unsafe {
DeviceIoControl(
handle,
FSCTL_SET_COMPRESSION,
Some(addr_of!(compression_format).cast()),
size_of::<u16>() as u32,
None,
0,
Some(addr_of_mut!(bytes_returned)),
None,
)
};
}