use std::{
ffi::{OsStr, OsString},
fs::{self, File, OpenOptions},
io::{self, BufReader, Read, Seek, Write},
mem,
path::{Path, PathBuf},
sync::{Arc, Mutex},
time::{Duration, SystemTime},
};
use cargo_metadata::Version;
use crates_index::Index;
use flate2::read::GzDecoder;
use futures_util::future::try_join_all;
use miette::{NamedSource, SourceOffset};
use reqwest::Url;
use serde::{Deserialize, Serialize};
use tar::Archive;
use tracing::{error, info, log::warn, trace};
use crate::{
errors::{
CacheAcquireError, CacheCommitError, CommandError, CriteriaChangeError,
CriteriaChangeErrors, DiffError, DownloadError, FetchAndDiffError, FetchAuditError,
FetchError, FlockError, InvalidCriteriaError, JsonParseError, LoadJsonError, LoadTomlError,
SourceFile, StoreAcquireError, StoreCommitError, StoreCreateError, StoreJsonError,
StoreTomlError, StoreValidateError, StoreValidateErrors, TomlParseError, UnpackError,
},
flock::{FileLock, Filesystem},
format::{
self, AuditEntry, AuditKind, AuditedDependencies, AuditsFile, CommandHistory, ConfigFile,
CriteriaEntry, CriteriaName, Delta, DiffCache, DiffStat, FastMap, FetchCommand,
ForeignAuditsFile, ImportName, ImportsFile, MetaConfig, PackageName, PackageStr, SortedMap,
SAFE_TO_DEPLOY, SAFE_TO_RUN,
},
network::Network,
out::{progress_bar, IncProgressOnDrop},
resolver::{self, Conclusion},
serialization::{parse_from_value, spanned::Spanned, to_formatted_toml},
Config, PartialConfig,
};
const CACHE_DIFF_CACHE: &str = "diff-cache.toml";
const CACHE_COMMAND_HISTORY: &str = "command-history.json";
const CACHE_EMPTY_PACKAGE: &str = "empty";
const CACHE_REGISTRY_SRC: &str = "src";
const CACHE_REGISTRY_CACHE: &str = "cache";
const CACHE_VET_LOCK: &str = ".vet-lock";
const CACHE_ALLOWED_FILES: &[&str] = &[
CACHE_DIFF_CACHE,
CACHE_COMMAND_HISTORY,
CACHE_EMPTY_PACKAGE,
CACHE_REGISTRY_SRC,
CACHE_REGISTRY_CACHE,
CACHE_VET_LOCK,
];
const CARGO_REGISTRY_SRC: &str = "src";
const CARGO_REGISTRY_CACHE: &str = "cache";
const CARGO_OK_FILE: &str = ".cargo-ok";
const CARGO_OK_BODY: &str = "ok";
pub const DEFAULT_STORE: &str = "supply-chain";
const AUDITS_TOML: &str = "audits.toml";
const CONFIG_TOML: &str = "config.toml";
const IMPORTS_LOCK: &str = "imports.lock";
const MAX_CONCURRENT_DIFFS: usize = 40;
struct StoreLock {
config: FileLock,
}
impl StoreLock {
fn new(store: &Filesystem) -> Result<Self, FlockError> {
Ok(StoreLock {
config: store.open_rw(CONFIG_TOML, "vet store")?,
})
}
fn read_config(&self) -> io::Result<impl Read + '_> {
let mut file = self.config.file();
file.rewind()?;
Ok(file)
}
fn write_config(&self) -> io::Result<impl Write + '_> {
let mut file = self.config.file();
file.rewind()?;
file.set_len(0)?;
Ok(file)
}
fn read_audits(&self) -> io::Result<impl Read> {
File::open(self.config.parent().join(AUDITS_TOML))
}
fn write_audits(&self) -> io::Result<impl Write> {
File::create(self.config.parent().join(AUDITS_TOML))
}
fn read_imports(&self) -> io::Result<impl Read> {
File::open(self.config.parent().join(IMPORTS_LOCK))
}
fn write_imports(&self) -> io::Result<impl Write> {
File::create(self.config.parent().join(IMPORTS_LOCK))
}
}
pub struct Store {
lock: Option<StoreLock>,
pub config: ConfigFile,
pub imports: ImportsFile,
pub audits: AuditsFile,
pub live_imports: Option<ImportsFile>,
pub config_src: SourceFile,
pub imports_src: SourceFile,
pub audits_src: SourceFile,
}
impl Store {
pub fn create(cfg: &Config) -> Result<Self, StoreCreateError> {
let root = cfg.metacfg.store_path();
root.create_dir().map_err(StoreCreateError::CouldntCreate)?;
let lock = StoreLock::new(&root)?;
Ok(Self {
lock: Some(lock),
config: ConfigFile {
default_criteria: String::new(),
imports: SortedMap::new(),
policy: SortedMap::new(),
exemptions: SortedMap::new(),
},
imports: ImportsFile {
audits: SortedMap::new(),
},
audits: AuditsFile {
criteria: SortedMap::new(),
audits: SortedMap::new(),
},
live_imports: None,
config_src: Arc::new(NamedSource::new(CONFIG_TOML, "")),
audits_src: Arc::new(NamedSource::new(AUDITS_TOML, "")),
imports_src: Arc::new(NamedSource::new(IMPORTS_LOCK, "")),
})
}
pub fn is_init(metacfg: &MetaConfig) -> bool {
metacfg.store_path().as_path_unlocked().exists()
}
pub fn acquire_offline(cfg: &Config) -> Result<Self, StoreAcquireError> {
Self::acquire(cfg, None, false)
}
pub fn acquire(
cfg: &Config,
network: Option<&Network>,
allow_criteria_changes: bool,
) -> Result<Self, StoreAcquireError> {
let root = cfg.metacfg.store_path();
let lock = StoreLock::new(&root)?;
let (config_src, config): (_, ConfigFile) = load_toml(CONFIG_TOML, lock.read_config()?)?;
let (audits_src, audits): (_, AuditsFile) = load_toml(AUDITS_TOML, lock.read_audits()?)?;
let (imports_src, imports): (_, ImportsFile) =
load_toml(IMPORTS_LOCK, lock.read_imports()?)?;
let live_imports = if let (false, Some(network)) = (cfg.cli.locked, network) {
let fetched_audits = tokio::runtime::Handle::current()
.block_on(fetch_imported_audits(network, &config))?;
let live_imports =
process_imported_audits(fetched_audits, &config, &imports, allow_criteria_changes)?;
Some(live_imports)
} else {
None
};
let store = Self {
lock: Some(lock),
config,
audits,
imports,
live_imports,
config_src,
audits_src,
imports_src,
};
store.validate()?;
Ok(store)
}
#[cfg(test)]
pub fn mock(config: ConfigFile, audits: AuditsFile, imports: ImportsFile) -> Self {
Self {
lock: None,
config,
imports,
audits,
live_imports: None,
config_src: Arc::new(NamedSource::new(CONFIG_TOML, "")),
audits_src: Arc::new(NamedSource::new(AUDITS_TOML, "")),
imports_src: Arc::new(NamedSource::new(IMPORTS_LOCK, "")),
}
}
#[cfg(test)]
pub fn mock_online(
config: ConfigFile,
audits: AuditsFile,
imports: ImportsFile,
fetched_audits: Vec<(ImportName, AuditsFile)>,
allow_criteria_changes: bool,
) -> Result<Self, CriteriaChangeErrors> {
for (_, original_file) in &fetched_audits {
let orig_toml = to_formatted_toml(original_file).unwrap().to_string();
let result = foreign_audit_file_to_local(toml::de::from_str(&orig_toml).unwrap());
assert_eq!(result.ignored_criteria, Vec::<String>::new());
assert_eq!(result.ignored_audits, Vec::<String>::new());
let new_toml = to_formatted_toml(&result.audit_file).unwrap().to_string();
assert_eq!(new_toml, orig_toml);
}
let live_imports =
process_imported_audits(fetched_audits, &config, &imports, allow_criteria_changes)?;
Ok(Self {
lock: None,
config,
imports,
audits,
live_imports: Some(live_imports),
config_src: Arc::new(NamedSource::new(CONFIG_TOML, "")),
audits_src: Arc::new(NamedSource::new(AUDITS_TOML, "")),
imports_src: Arc::new(NamedSource::new(IMPORTS_LOCK, "")),
})
}
#[cfg(test)]
pub fn mock_acquire(
config: &str,
audits: &str,
imports: &str,
) -> Result<Self, StoreAcquireError> {
let (config_src, config): (_, ConfigFile) = load_toml(CONFIG_TOML, config.as_bytes())?;
let (audits_src, audits): (_, AuditsFile) = load_toml(AUDITS_TOML, audits.as_bytes())?;
let (imports_src, imports): (_, ImportsFile) = load_toml(IMPORTS_LOCK, imports.as_bytes())?;
let store = Self {
lock: None,
config,
imports,
audits,
live_imports: None,
config_src,
audits_src,
imports_src,
};
store.validate()?;
Ok(store)
}
pub fn clone_for_suggest(&self) -> Self {
let mut clone = Self {
lock: None,
config: self.config.clone(),
imports: self.imports.clone(),
audits: self.audits.clone(),
live_imports: self.live_imports.clone(),
config_src: self.config_src.clone(),
audits_src: self.audits_src.clone(),
imports_src: self.imports_src.clone(),
};
for versions in &mut clone.config.exemptions.values_mut() {
versions.retain(|e| !e.suggest);
}
clone
}
pub fn imported_audits(&self) -> &SortedMap<ImportName, AuditsFile> {
match &self.live_imports {
Some(live_imports) => &live_imports.audits,
None => &self.imports.audits,
}
}
pub fn commit(self) -> Result<(), StoreCommitError> {
if let Some(lock) = self.lock {
let audits = lock.write_audits()?;
let config = lock.write_config()?;
let imports = lock.write_imports()?;
store_audits(audits, self.audits)?;
store_config(config, self.config)?;
store_imports(imports, self.imports)?;
}
Ok(())
}
#[cfg(test)]
pub fn mock_commit(&self) -> SortedMap<String, String> {
let mut audits = Vec::new();
let mut config = Vec::new();
let mut imports = Vec::new();
store_audits(&mut audits, self.audits.clone()).unwrap();
store_config(&mut config, self.config.clone()).unwrap();
store_imports(&mut imports, self.imports.clone()).unwrap();
[
(AUDITS_TOML.to_owned(), String::from_utf8(audits).unwrap()),
(CONFIG_TOML.to_owned(), String::from_utf8(config).unwrap()),
(IMPORTS_LOCK.to_owned(), String::from_utf8(imports).unwrap()),
]
.into_iter()
.collect()
}
#[allow(clippy::for_kv_map)]
pub fn validate(&self) -> Result<(), StoreValidateErrors> {
fn check_criteria(
source_code: &SourceFile,
valid: &Arc<Vec<CriteriaName>>,
errors: &mut Vec<InvalidCriteriaError>,
criteria: &[Spanned<CriteriaName>],
) {
for criteria in criteria {
if !valid.contains(criteria) {
errors.push(InvalidCriteriaError {
source_code: source_code.clone(),
span: Spanned::span(criteria),
invalid: criteria.to_string(),
valid_names: valid.clone(),
})
}
}
}
let valid_criteria = Arc::new(
self.audits
.criteria
.iter()
.map(|(c, _)| &**c)
.chain([SAFE_TO_RUN, SAFE_TO_DEPLOY])
.map(|name| name.to_string())
.collect::<Vec<_>>(),
);
let no_criteria = vec![];
let mut invalid_criteria_errors = vec![];
for (_package, entries) in &self.config.exemptions {
for entry in entries {
check_criteria(
&self.config_src,
&valid_criteria,
&mut invalid_criteria_errors,
&entry.criteria,
);
for (_dep_package, dep_criteria) in &entry.dependency_criteria {
check_criteria(
&self.config_src,
&valid_criteria,
&mut invalid_criteria_errors,
dep_criteria,
);
}
}
}
for (_package, policy) in &self.config.policy {
check_criteria(
&self.config_src,
&valid_criteria,
&mut invalid_criteria_errors,
policy.criteria.as_ref().unwrap_or(&no_criteria),
);
check_criteria(
&self.config_src,
&valid_criteria,
&mut invalid_criteria_errors,
policy.dev_criteria.as_ref().unwrap_or(&no_criteria),
);
for (_dep_package, dep_criteria) in &policy.dependency_criteria {
check_criteria(
&self.config_src,
&valid_criteria,
&mut invalid_criteria_errors,
dep_criteria,
);
}
}
for (_new_criteria, entry) in &self.audits.criteria {
check_criteria(
&self.audits_src,
&valid_criteria,
&mut invalid_criteria_errors,
&entry.implies,
);
}
for (_package, entries) in &self.audits.audits {
for entry in entries {
check_criteria(
&self.audits_src,
&valid_criteria,
&mut invalid_criteria_errors,
&entry.criteria,
);
match &entry.kind {
crate::format::AuditKind::Full {
dependency_criteria,
..
} => {
for (_dep_package, dep_criteria) in dependency_criteria {
check_criteria(
&self.audits_src,
&valid_criteria,
&mut invalid_criteria_errors,
dep_criteria,
);
}
}
crate::format::AuditKind::Delta {
dependency_criteria,
..
} => {
for (_dep_package, dep_criteria) in dependency_criteria {
check_criteria(
&self.audits_src,
&valid_criteria,
&mut invalid_criteria_errors,
dep_criteria,
);
}
}
crate::format::AuditKind::Violation { .. } => {}
}
}
}
let imports_lock_outdated = self
.imports_lock_outdated()
.then(|| StoreValidateError::ImportsLockOutdated);
let errors = invalid_criteria_errors
.into_iter()
.map(StoreValidateError::InvalidCriteria)
.chain(imports_lock_outdated)
.collect::<Vec<_>>();
if !errors.is_empty() {
return Err(StoreValidateErrors { errors });
}
Ok(())
}
fn imports_lock_outdated(&self) -> bool {
if self.live_imports.is_some() {
return false;
}
if self.config.imports.keys().ne(self.imports.audits.keys()) {
return true;
}
for (import_name, config) in &self.config.imports {
let audits_file = self.imports.audits.get(import_name).unwrap();
for crate_name in &config.exclude {
if audits_file.audits.contains_key(crate_name) {
return true;
}
}
}
false
}
#[must_use]
pub fn get_updated_imports_file(
&self,
graph: &resolver::DepGraph<'_>,
conclusion: &resolver::Conclusion,
force_update: bool,
) -> ImportsFile {
if self.live_imports.is_none() {
return self.imports.clone();
}
let mut used_packages = SortedMap::new();
for package in graph.nodes.iter() {
used_packages.insert(package.name, false);
}
if let Conclusion::Success(success) = conclusion {
for &node_idx in &success.needed_fresh_imports {
let package = &graph.nodes[node_idx];
used_packages.insert(package.name, true);
}
}
let mut new_imports = ImportsFile {
audits: SortedMap::new(),
};
for import_name in self.config.imports.keys() {
let live_audit_file = self
.imported_audits()
.get(import_name)
.expect("Live audits missing for import?");
let first_import = !self.imports.audits.contains_key(import_name);
let mut new_audits_file = AuditsFile {
criteria: live_audit_file.criteria.clone(),
audits: SortedMap::new(),
};
for (&package, &need_fresh_imports) in &used_packages {
let update_package = first_import || need_fresh_imports || force_update;
let audits = live_audit_file
.audits
.get(package)
.map(|v| &v[..])
.unwrap_or(&[])
.iter()
.filter(|audit| {
update_package
|| !audit.is_fresh_import
|| matches!(audit.kind, AuditKind::Violation { .. })
})
.cloned()
.map(|mut audit| {
audit.is_fresh_import = false;
audit
})
.collect::<Vec<_>>();
if !audits.is_empty() {
new_audits_file.audits.insert(package.to_owned(), audits);
}
}
new_imports
.audits
.insert(import_name.to_owned(), new_audits_file);
}
new_imports
}
}
fn process_imported_audits(
fetched_audits: Vec<(ImportName, AuditsFile)>,
config_file: &ConfigFile,
imports_lock: &ImportsFile,
allow_criteria_changes: bool,
) -> Result<ImportsFile, CriteriaChangeErrors> {
let mut new_imports = ImportsFile {
audits: SortedMap::new(),
};
let mut changed_criteria = Vec::new();
for (import_name, mut audits_file) in fetched_audits {
let config = config_file
.imports
.get(&import_name)
.expect("fetched audit without config?");
for excluded in &config.exclude {
audits_file.audits.remove(excluded);
}
for audit_entry in audits_file.audits.values_mut().flat_map(|v| v.iter_mut()) {
audit_entry.is_fresh_import = true;
}
if let Some(existing_audits_file) = imports_lock.audits.get(&import_name) {
if !allow_criteria_changes {
for (criteria_name, old_entry) in &existing_audits_file.criteria {
if let Some(new_entry) = audits_file.criteria.get(criteria_name) {
let old_desc = old_entry.description.as_ref().unwrap();
let new_desc = new_entry.description.as_ref().unwrap();
if old_desc != new_desc {
changed_criteria.push(CriteriaChangeError {
import_name: import_name.clone(),
criteria_name: criteria_name.to_owned(),
old_desc: old_desc.clone(),
new_desc: new_desc.clone(),
});
}
}
}
}
for (package, existing_audits) in &existing_audits_file.audits {
let new_audits = audits_file
.audits
.get_mut(package)
.map(|v| &mut v[..])
.unwrap_or(&mut []);
for existing_audit in existing_audits {
for new_audit in &mut *new_audits {
if new_audit.is_fresh_import
&& new_audit.kind == existing_audit.kind
&& new_audit.criteria == existing_audit.criteria
{
new_audit.is_fresh_import = false;
break;
}
}
}
}
}
new_imports.audits.insert(import_name, audits_file);
}
if !changed_criteria.is_empty() {
return Err(CriteriaChangeErrors {
errors: changed_criteria,
});
}
Ok(new_imports)
}
async fn fetch_imported_audits(
network: &Network,
config: &ConfigFile,
) -> Result<Vec<(ImportName, AuditsFile)>, FetchAuditError> {
let progress_bar = progress_bar("Fetching", "imported audits", config.imports.len() as u64);
try_join_all(config.imports.iter().map(|(name, import)| async {
let _guard = IncProgressOnDrop(&progress_bar, 1);
let audit_file = fetch_imported_audit(network, name, &import.url).await?;
Ok::<_, FetchAuditError>((name.clone(), audit_file))
}))
.await
}
async fn fetch_imported_audit(
network: &Network,
name: &str,
url: &str,
) -> Result<AuditsFile, FetchAuditError> {
let parsed_url = Url::parse(url).map_err(|error| FetchAuditError::InvalidUrl {
import_url: url.to_owned(),
import_name: name.to_owned(),
error,
})?;
let audit_bytes = network.download(parsed_url).await?;
let audit_string = String::from_utf8(audit_bytes).map_err(LoadTomlError::from)?;
let audit_source = Arc::new(NamedSource::new(name, audit_string.clone()));
let foreign_audit_file: ForeignAuditsFile = toml::de::from_str(&audit_string)
.map_err(|error| {
let (line, col) = error.line_col().unwrap_or((0, 0));
TomlParseError {
source_code: audit_source,
span: SourceOffset::from_location(&audit_string, line + 1, col + 1),
error,
}
})
.map_err(LoadTomlError::from)?;
let ForeignAuditFileToLocalResult {
mut audit_file,
ignored_criteria,
ignored_audits,
} = foreign_audit_file_to_local(foreign_audit_file);
if !ignored_criteria.is_empty() {
warn!(
"Ignored {} invalid criteria entries when importing from '{}'\n\
These criteria may have been made with a more recent version of cargo-vet",
ignored_criteria.len(),
name
);
info!(
"The following criteria were ignored when importing from '{}': {:?}",
name, ignored_criteria
);
}
if !ignored_audits.is_empty() {
warn!(
"Ignored {} invalid audits when importing from '{}'\n\
These audits may have been made with a more recent version of cargo-vet",
ignored_audits.len(),
name
);
info!(
"Audits for the following packages were ignored when importing from '{}': {:?}",
name, ignored_audits
);
}
try_join_all(
audit_file
.criteria
.iter_mut()
.map(|(criteria_name, criteria_entry)| async {
if criteria_entry.description.is_some() {
return Ok(());
}
let url_string = criteria_entry.description_url.as_ref().ok_or_else(|| {
FetchAuditError::MissingCriteriaDescription {
import_name: name.to_owned(),
criteria_name: criteria_name.clone(),
}
})?;
let url = Url::parse(url_string).map_err(|error| {
FetchAuditError::InvalidCriteriaDescriptionUrl {
import_name: name.to_owned(),
criteria_name: criteria_name.clone(),
url: url_string.clone(),
error,
}
})?;
let bytes = network.download(url.clone()).await?;
let description =
String::from_utf8(bytes).map_err(|error| DownloadError::InvalidText {
url: url.clone(),
error,
})?;
criteria_entry.description = Some(description);
Ok::<(), FetchAuditError>(())
}),
)
.await?;
Ok(audit_file)
}
pub(crate) struct ForeignAuditFileToLocalResult {
pub audit_file: AuditsFile,
pub ignored_criteria: Vec<CriteriaName>,
pub ignored_audits: Vec<PackageName>,
}
fn is_known_criteria(valid_criteria: &[CriteriaName], criteria_name: &CriteriaName) -> bool {
criteria_name == format::SAFE_TO_RUN
|| criteria_name == format::SAFE_TO_DEPLOY
|| valid_criteria.contains(criteria_name)
}
pub(crate) fn foreign_audit_file_to_local(
foreign_audit_file: ForeignAuditsFile,
) -> ForeignAuditFileToLocalResult {
let mut ignored_criteria = Vec::new();
let mut criteria: SortedMap<CriteriaName, CriteriaEntry> = foreign_audit_file
.criteria
.into_iter()
.filter_map(|(criteria, value)| match parse_imported_criteria(value) {
Some(entry) => Some((criteria, entry)),
None => {
ignored_criteria.push(criteria);
None
}
})
.collect();
let valid_criteria: Vec<CriteriaName> = criteria.keys().cloned().collect();
for entry in criteria.values_mut() {
entry
.implies
.retain(|criteria_name| is_known_criteria(&valid_criteria, criteria_name));
}
let mut ignored_audits = Vec::new();
let audits: AuditedDependencies = foreign_audit_file
.audits
.into_iter()
.map(|(package, audits)| {
let parsed: Vec<_> = audits
.into_iter()
.filter_map(|value| match parse_imported_audit(&valid_criteria, value) {
Some(audit) => Some(audit),
None => {
ignored_audits.push(package.clone());
None
}
})
.collect();
(package, parsed)
})
.filter(|(_, audits)| !audits.is_empty())
.collect();
ForeignAuditFileToLocalResult {
audit_file: AuditsFile { criteria, audits },
ignored_criteria,
ignored_audits,
}
}
fn parse_imported_criteria(value: toml::Value) -> Option<CriteriaEntry> {
parse_from_value(value)
.map_err(|err| info!("imported criteria parsing failed due to {err}"))
.ok()
}
fn parse_imported_audit(valid_criteria: &[CriteriaName], value: toml::Value) -> Option<AuditEntry> {
let mut audit: AuditEntry = parse_from_value(value)
.map_err(|err| info!("imported audit parsing failed due to {err}"))
.ok()?;
audit
.criteria
.retain(|criteria_name| is_known_criteria(valid_criteria, criteria_name));
if audit.criteria.is_empty() {
info!("imported audit parsing failed due to no known criteria");
return None;
}
match &audit.kind {
AuditKind::Delta {
dependency_criteria,
..
}
| AuditKind::Full {
dependency_criteria,
..
} => {
for criteria_name in dependency_criteria.values().flatten() {
if !is_known_criteria(valid_criteria, criteria_name) {
info!("imported audit parsing failed due to unknown dependency criteria: {criteria_name}");
return None;
}
}
}
_ => {}
}
Some(audit)
}
pub struct CargoRegistry {
index: Index,
base_dir: PathBuf,
registry: OsString,
}
impl CargoRegistry {
pub fn src(&self) -> PathBuf {
self.base_dir.join(CARGO_REGISTRY_SRC).join(&self.registry)
}
pub fn cache(&self) -> PathBuf {
self.base_dir
.join(CARGO_REGISTRY_CACHE)
.join(&self.registry)
}
}
struct CacheState {
diff_cache: DiffCache,
command_history: CommandHistory,
fetched_packages: FastMap<(String, Version), Arc<tokio::sync::OnceCell<PathBuf>>>,
diffed: FastMap<(String, Delta), Arc<tokio::sync::OnceCell<DiffStat>>>,
}
pub struct Cache {
_lock: Option<FileLock>,
root: Option<PathBuf>,
cargo_registry: Option<CargoRegistry>,
diff_cache_path: Option<PathBuf>,
command_history_path: Option<PathBuf>,
diff_semaphore: tokio::sync::Semaphore,
state: Mutex<CacheState>,
}
impl Drop for Cache {
fn drop(&mut self) {
let state = self.state.get_mut().unwrap();
if let Some(diff_cache_path) = &self.diff_cache_path {
if let Err(err) = || -> Result<(), CacheCommitError> {
store_diff_cache(
File::create(diff_cache_path)?,
mem::take(&mut state.diff_cache),
)?;
Ok(())
}() {
error!("error writing back changes to diff-cache: {:?}", err);
}
}
if let Some(command_history_path) = &self.command_history_path {
if let Err(err) = || -> Result<(), CacheCommitError> {
store_command_history(
File::create(command_history_path)?,
mem::take(&mut state.command_history),
)?;
Ok(())
}() {
error!("error writing back changes to diff-cache: {:?}", err);
}
}
}
}
impl Cache {
pub fn acquire(cfg: &PartialConfig) -> Result<Self, CacheAcquireError> {
if cfg.mock_cache {
return Ok(Cache {
_lock: None,
root: None,
cargo_registry: None,
diff_cache_path: None,
command_history_path: None,
diff_semaphore: tokio::sync::Semaphore::new(MAX_CONCURRENT_DIFFS),
state: Mutex::new(CacheState {
diff_cache: DiffCache::default(),
command_history: CommandHistory::default(),
fetched_packages: FastMap::new(),
diffed: FastMap::new(),
}),
});
}
let root = cfg.cache_dir.clone();
fs::create_dir_all(&root).map_err(|error| CacheAcquireError::Root {
target: root.clone(),
error,
})?;
let lock = Filesystem::new(root.clone()).open_rw(CACHE_VET_LOCK, "cache lock")?;
let empty = root.join(CACHE_EMPTY_PACKAGE);
fs::create_dir_all(&empty).map_err(|error| CacheAcquireError::Empty {
target: empty.clone(),
error,
})?;
let packages_src = root.join(CACHE_REGISTRY_SRC);
fs::create_dir_all(&packages_src).map_err(|error| CacheAcquireError::Src {
target: empty.clone(),
error,
})?;
let packages_cache = root.join(CACHE_REGISTRY_CACHE);
fs::create_dir_all(&packages_cache).map_err(|error| CacheAcquireError::Cache {
target: packages_cache.clone(),
error,
})?;
let diff_cache_path = cfg
.cli
.diff_cache
.clone()
.unwrap_or_else(|| root.join(CACHE_DIFF_CACHE));
let diff_cache: DiffCache = File::open(&diff_cache_path)
.ok()
.and_then(|f| load_toml(CACHE_DIFF_CACHE, f).map(|v| v.1).ok())
.unwrap_or_default();
let command_history_path = root.join(CACHE_COMMAND_HISTORY);
let command_history: CommandHistory = File::open(&command_history_path)
.ok()
.and_then(|f| load_json(f).ok())
.unwrap_or_default();
let cargo_registry = find_cargo_registry();
if let Err(e) = &cargo_registry {
warn!("Couldn't find cargo registry: {e}");
}
Ok(Self {
_lock: Some(lock),
root: Some(root),
diff_cache_path: Some(diff_cache_path),
command_history_path: Some(command_history_path),
cargo_registry: cargo_registry.ok(),
diff_semaphore: tokio::sync::Semaphore::new(MAX_CONCURRENT_DIFFS),
state: Mutex::new(CacheState {
diff_cache,
command_history,
fetched_packages: FastMap::new(),
diffed: FastMap::new(),
}),
})
}
#[cfg(not(test))]
pub fn query_package_from_index(&self, name: PackageStr) -> Option<crates_index::Crate> {
let reg = self.cargo_registry.as_ref()?;
reg.index.crate_(name)
}
#[cfg(test)]
pub fn query_package_from_index(&self, name: PackageStr) -> Option<crates_index::Crate> {
if let Some(reg) = self.cargo_registry.as_ref() {
reg.index.crate_(name)
} else {
crate::tests::MockRegistry::testing_cinematic_universe().package(name)
}
}
#[tracing::instrument(skip(self, network), err)]
pub async fn fetch_package(
&self,
network: Option<&Network>,
package: PackageStr<'_>,
version: &Version,
) -> Result<PathBuf, FetchError> {
let once_cell = {
let mut guard = self.state.lock().unwrap();
guard
.fetched_packages
.entry((package.to_owned(), version.clone()))
.or_default()
.clone()
};
let path_res: Result<_, FetchError> = once_cell
.get_or_try_init(|| async {
let root = self.root.as_ref().unwrap();
let dir_name = format!("{}-{}", package, version);
if let Some(reg) = self.cargo_registry.as_ref() {
let fetched_src = reg.src().join(&dir_name);
if fetch_is_ok(&fetched_src).await {
return Ok(fetched_src);
}
}
let fetched_package = root
.join(CACHE_REGISTRY_CACHE)
.join(format!("{}.crate", dir_name));
let fetched_src = root.join(CACHE_REGISTRY_SRC).join(&dir_name);
let fetched_package_ = fetched_package.clone();
let cached_file = tokio::task::spawn_blocking(move || {
File::open(&fetched_package_).map(|file| {
let now = filetime::FileTime::now();
if let Err(err) =
filetime::set_file_handle_times(&file, Some(now), Some(now))
{
warn!(
"failed to update mtime for {}, gc may not function correctly: {}",
fetched_package_.display(),
err
);
}
file
})
})
.await
.expect("failed to join");
let file = match cached_file {
Ok(file) => file,
Err(_) => {
let network = network.ok_or_else(|| FetchError::Frozen {
package: package.to_owned(),
version: version.clone(),
})?;
let url =
format!("https://crates.io/api/v1/crates/{package}/{version}/download");
let url = Url::parse(&url).map_err(|error| FetchError::InvalidUrl {
url: url.clone(),
error,
})?;
info!(
"downloading package {}:{} from {} to {}",
package,
version,
url,
fetched_package.display()
);
network.download_and_persist(url, &fetched_package).await?;
let fetched_package_ = fetched_package.clone();
tokio::task::spawn_blocking(move || File::open(&fetched_package_))
.await
.expect("failed to join")
.map_err(|error| FetchError::OpenCached {
target: fetched_package.clone(),
error,
})?
}
};
if fetch_is_ok(&fetched_src).await {
Ok(fetched_src)
} else {
info!(
"unpacking package {}:{} from {} to {}",
package,
version,
fetched_package.display(),
fetched_src.display()
);
tokio::task::spawn_blocking(move || {
unpack_package(&file, &fetched_src)
.map(|_| fetched_src)
.map_err(|error| FetchError::Unpack {
src: fetched_package.clone(),
error,
})
})
.await
.expect("failed to join")
}
})
.await;
let path = path_res?;
Ok(path.to_owned())
}
#[tracing::instrument(skip_all, err)]
async fn diffstat_package(
&self,
version1: &Path,
version2: &Path,
) -> Result<DiffStat, DiffError> {
let _permit = self
.diff_semaphore
.acquire()
.await
.expect("Semaphore dropped?!");
trace!("diffstating {version1:#?} {version2:#?}");
let out = tokio::process::Command::new("git")
.arg("diff")
.arg("--no-index")
.arg("--shortstat")
.arg(version1)
.arg(version2)
.output()
.await
.map_err(CommandError::CommandFailed)?;
let status = out.status.code().unwrap_or(-1);
if status != 0 && status != 1 {
return Err(CommandError::BadStatus(status).into());
}
let diffstat = String::from_utf8(out.stdout).map_err(CommandError::BadOutput)?;
let count = if diffstat.is_empty() {
0
} else {
let mut parts = diffstat.split(',');
parts.next().unwrap();
fn parse_diffnum(part: Option<&str>) -> Option<u64> {
part?.trim().split_once(' ')?.0.parse().ok()
}
let added: u64 = parse_diffnum(parts.next()).unwrap_or(0);
let removed: u64 = parse_diffnum(parts.next()).unwrap_or(0);
assert_eq!(
parts.next(),
None,
"diffstat had more parts than expected? {}",
diffstat
);
added + removed
};
Ok(DiffStat {
raw: diffstat,
count,
})
}
#[tracing::instrument(skip(self, network), err)]
pub async fn fetch_and_diffstat_package(
&self,
network: Option<&Network>,
package: PackageStr<'_>,
delta: &Delta,
) -> Result<DiffStat, FetchAndDiffError> {
let once_cell = {
let mut guard = self.state.lock().unwrap();
let DiffCache::V1 { diffs } = &guard.diff_cache;
if let Some(cached) = diffs
.get(package)
.and_then(|cache| cache.get(delta))
.cloned()
{
return Ok(cached);
}
if self.root.is_none() {
warn!("Missing root, assuming we're in tests and mocking");
let from_len = match &delta.from {
Some(from) => from.major * from.major,
None => 0,
};
let to_len: u64 = delta.to.major * delta.to.major;
let diff = to_len as i64 - from_len as i64;
let count = diff.unsigned_abs();
let raw = if diff < 0 {
format!("-{}", count)
} else {
format!("+{}", count)
};
return Ok(DiffStat { raw, count });
}
guard
.diffed
.entry((package.to_owned(), delta.clone()))
.or_default()
.clone()
};
let diffstat = once_cell
.get_or_try_init(|| async {
let from = match &delta.from {
Some(from) => self.fetch_package(network, package, from).await?,
None => self.root.as_ref().unwrap().join(CACHE_EMPTY_PACKAGE),
};
let to = self.fetch_package(network, package, &delta.to).await?;
let diffstat = self.diffstat_package(&from, &to).await?;
{
let mut guard = self.state.lock().unwrap();
let DiffCache::V1 { diffs } = &mut guard.diff_cache;
diffs
.entry(package.to_string())
.or_insert(SortedMap::new())
.insert(delta.clone(), diffstat.clone());
}
Ok::<_, FetchAndDiffError>(diffstat)
})
.await?;
Ok(diffstat.clone())
}
pub async fn gc(&self, max_package_age: Duration) {
if self.root.is_none() {
return;
}
let (root_rv, empty_rv, packages_rv) = tokio::join!(
self.gc_root(),
self.gc_empty(),
self.gc_packages(max_package_age)
);
if let Err(err) = root_rv {
error!("gc: performing gc on the cache root failed: {err}");
}
if let Err(err) = empty_rv {
error!("gc: performing gc on the empty package failed: {err}");
}
if let Err(err) = packages_rv {
error!("gc: performing gc on the package cache failed: {err}");
}
}
pub fn gc_sync(&self, max_package_age: Duration) {
tokio::runtime::Handle::current().block_on(self.gc(max_package_age));
}
async fn gc_root(&self) -> Result<(), io::Error> {
let root = self.root.as_ref().unwrap();
let mut root_entries = tokio::fs::read_dir(root).await?;
while let Some(entry) = root_entries.next_entry().await? {
if !entry
.file_name()
.to_str()
.map_or(false, |name| CACHE_ALLOWED_FILES.contains(&name))
{
remove_dir_entry(&entry).await?;
}
}
Ok(())
}
async fn gc_empty(&self) -> Result<(), std::io::Error> {
let empty = self.root.as_ref().unwrap().join(CACHE_EMPTY_PACKAGE);
let mut empty_entries = tokio::fs::read_dir(&empty).await?;
while let Some(entry) = empty_entries.next_entry().await? {
remove_dir_entry(&entry).await?;
}
Ok(())
}
async fn gc_packages(&self, max_package_age: Duration) -> Result<(), io::Error> {
let cache = self.root.as_ref().unwrap().join(CACHE_REGISTRY_CACHE);
let src = self.root.as_ref().unwrap().join(CACHE_REGISTRY_SRC);
let mut kept_packages = Vec::new();
let mut cache_entries = tokio::fs::read_dir(&cache).await?;
while let Some(entry) = cache_entries.next_entry().await? {
if let Some(to_keep) = should_keep_package(&entry, max_package_age).await {
kept_packages.push(to_keep);
} else {
remove_dir_entry(&entry).await?;
}
}
let mut src_entries = tokio::fs::read_dir(&src).await?;
while let Some(entry) = src_entries.next_entry().await? {
if !kept_packages.contains(&entry.file_name()) || !fetch_is_ok(&entry.path()).await {
remove_dir_entry(&entry).await?;
}
}
Ok(())
}
pub async fn clean(&self) -> Result<(), io::Error> {
let root = self.root.as_ref().expect("cannot clean a mocked cache");
{
let mut guard = self.state.lock().unwrap();
guard.command_history = Default::default();
guard.diff_cache = Default::default();
}
let mut root_entries = tokio::fs::read_dir(&root).await?;
while let Some(entry) = root_entries.next_entry().await? {
if entry.file_name() != Path::new(CACHE_VET_LOCK) {
remove_dir_entry(&entry).await?;
}
}
Ok(())
}
pub fn clean_sync(&self) -> Result<(), io::Error> {
tokio::runtime::Handle::current().block_on(self.clean())
}
pub fn get_last_fetch(&self) -> Option<FetchCommand> {
let guard = self.state.lock().unwrap();
guard.command_history.last_fetch.clone()
}
pub fn set_last_fetch(&self, last_fetch: FetchCommand) {
let mut guard = self.state.lock().unwrap();
guard.command_history.last_fetch = Some(last_fetch);
}
}
pub fn exact_version<'a>(
this: &'a crates_index::Crate,
target_version: &Version,
) -> Option<&'a crates_index::Version> {
for index_version in this.versions() {
if let Ok(index_ver) = index_version.version().parse::<cargo_metadata::Version>() {
if &index_ver == target_version {
return Some(index_version);
}
}
}
None
}
#[tracing::instrument(err)]
fn unpack_package(tarball: &File, unpack_dir: &Path) -> Result<(), UnpackError> {
if unpack_dir.exists() {
fs::remove_dir_all(unpack_dir)?;
}
fs::create_dir(unpack_dir)?;
let lockfile = unpack_dir.join(CARGO_OK_FILE);
let gz = GzDecoder::new(tarball);
let mut tar = Archive::new(gz);
let prefix = unpack_dir.file_name().unwrap();
let parent = unpack_dir.parent().unwrap();
for entry in tar.entries()? {
let mut entry = entry.map_err(UnpackError::ArchiveIterate)?;
let entry_path = entry
.path()
.map_err(UnpackError::ArchiveEntry)?
.into_owned();
if !entry_path.starts_with(prefix) {
return Err(UnpackError::InvalidPaths {
entry_path,
prefix: prefix.to_owned(),
});
}
entry
.unpack_in(parent)
.map_err(|error| UnpackError::Unpack {
entry_path: entry_path.clone(),
error,
})?;
}
let mut ok = OpenOptions::new()
.create(true)
.read(true)
.write(true)
.open(&lockfile)
.map_err(|error| UnpackError::LockCreate {
target: lockfile.clone(),
error,
})?;
write!(ok, "ok").map_err(|error| UnpackError::LockCreate {
target: lockfile.clone(),
error,
})?;
Ok(())
}
async fn fetch_is_ok(fetch: &Path) -> bool {
match tokio::fs::read_to_string(fetch.join(CARGO_OK_FILE)).await {
Ok(ok) => ok == CARGO_OK_BODY,
Err(_) => false,
}
}
async fn remove_dir_entry(entry: &tokio::fs::DirEntry) -> Result<(), io::Error> {
info!("gc: removing {}", entry.path().display());
let file_type = entry.file_type().await?;
if file_type.is_dir() {
tokio::fs::remove_dir_all(entry.path()).await?;
} else {
tokio::fs::remove_file(entry.path()).await?;
}
Ok(())
}
async fn get_file_age(entry: &tokio::fs::DirEntry) -> Option<Duration> {
let now = SystemTime::now();
let meta = entry.metadata().await.ok()?;
now.duration_since(meta.modified().ok()?).ok()
}
async fn should_keep_package(
entry: &tokio::fs::DirEntry,
max_package_age: Duration,
) -> Option<OsString> {
let path = entry.path();
let stem = path.file_stem()?;
if path.extension()? != OsStr::new("crate") {
return None;
}
match get_file_age(entry).await {
Some(age) if age > max_package_age => None,
_ => Some(stem.to_owned()),
}
}
fn find_cargo_registry() -> Result<CargoRegistry, crates_index::Error> {
let index = Index::new_cargo_default()?;
let base_dir = index.path().parent().unwrap().parent().unwrap().to_owned();
let registry = index.path().file_name().unwrap().to_owned();
Ok(CargoRegistry {
index,
base_dir,
registry,
})
}
fn load_toml<T>(file_name: &str, reader: impl Read) -> Result<(SourceFile, T), LoadTomlError>
where
T: for<'a> Deserialize<'a>,
{
let mut reader = BufReader::new(reader);
let mut string = String::new();
reader.read_to_string(&mut string)?;
let result = toml::de::from_str(&string);
match result {
Ok(toml) => Ok((Arc::new(NamedSource::new(file_name, string)), toml)),
Err(error) => {
let (line, col) = error.line_col().unwrap_or((0, 0));
let span = SourceOffset::from_location(&string, line + 1, col);
Err(TomlParseError {
source_code: Arc::new(NamedSource::new(file_name, string)),
span,
error,
}
.into())
}
}
}
fn store_toml<T>(mut writer: impl Write, heading: &str, val: T) -> Result<(), StoreTomlError>
where
T: Serialize,
{
let toml_document = to_formatted_toml(val)?;
writeln!(writer, "{}{}", heading, toml_document)?;
Ok(())
}
fn load_json<T>(reader: impl Read) -> Result<T, LoadJsonError>
where
T: for<'a> Deserialize<'a>,
{
let mut reader = BufReader::new(reader);
let mut string = String::new();
reader.read_to_string(&mut string)?;
let json = serde_json::from_str(&string).map_err(|error| JsonParseError { error })?;
Ok(json)
}
fn store_json<T>(mut writer: impl Write, val: T) -> Result<(), StoreJsonError>
where
T: Serialize,
{
let json_string = serde_json::to_string(&val)?;
writeln!(writer, "{}", json_string)?;
Ok(())
}
fn store_audits(writer: impl Write, mut audits: AuditsFile) -> Result<(), StoreTomlError> {
let heading = r###"
# cargo-vet audits file
"###;
audits
.audits
.values_mut()
.for_each(|entries| entries.sort());
store_toml(writer, heading, audits)?;
Ok(())
}
fn store_config(writer: impl Write, mut config: ConfigFile) -> Result<(), StoreTomlError> {
config
.exemptions
.values_mut()
.for_each(|entries| entries.sort());
let heading = r###"
# cargo-vet config file
"###;
store_toml(writer, heading, config)?;
Ok(())
}
fn store_imports(writer: impl Write, imports: ImportsFile) -> Result<(), StoreTomlError> {
let heading = r###"
# cargo-vet imports lock
"###;
store_toml(writer, heading, imports)?;
Ok(())
}
fn store_diff_cache(writer: impl Write, diff_cache: DiffCache) -> Result<(), StoreTomlError> {
let heading = "";
store_toml(writer, heading, diff_cache)?;
Ok(())
}
fn store_command_history(
writer: impl Write,
command_history: CommandHistory,
) -> Result<(), StoreJsonError> {
store_json(writer, command_history)?;
Ok(())
}