use super::{CodebaseData, git, source_file_analyzer};
use crate::Result;
use crate::facts::ProviderResult;
use crate::facts::cache::{Cache, CacheResult};
use crate::facts::codebase::github_workflow_analyzer::{GitHubWorkflowInfo, sniff_github_workflows};
use crate::facts::crate_spec::{self, CrateSpec};
use crate::facts::path_utils::sanitize_path_component;
use crate::facts::repo_spec::RepoSpec;
use crate::facts::request_tracker::{RequestTracker, TrackedTopic};
use crate::facts::throttler::Throttler;
use cargo_metadata::{Metadata, MetadataCommand, PackageId, TargetKind};
use chrono::{DateTime, Utc};
use core::time::Duration;
use futures_util::future::join_all;
use ohno::{EnrichableExt, IntoAppError, app_err};
use crate::HashMap;
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::sync::Semaphore;
use tokio::task::{JoinHandle, spawn_blocking};
pub(super) const LOG_TARGET: &str = " codebase";
const MAX_CONCURRENT_REQUESTS: usize = 5;
#[derive(Debug, Clone)]
pub struct Provider {
cache: Cache,
throttler: Arc<Throttler>,
}
const METADATA_TIMEOUT: Duration = Duration::from_mins(5);
const GIT_REPO_TIMEOUT: Duration = Duration::from_mins(5);
#[derive(Debug, Clone)]
struct RepoData {
metadata: Arc<Metadata>,
workflows: GitHubWorkflowInfo,
contributor_count: u64,
commits_last_90_days: u64,
commits_last_180_days: u64,
commits_last_365_days: u64,
commit_count: u64,
first_commit_at: DateTime<Utc>,
last_commit_at: DateTime<Utc>,
}
impl Provider {
#[must_use]
pub fn new(cache: Cache) -> Self {
Self {
cache,
throttler: Throttler::new(MAX_CONCURRENT_REQUESTS),
}
}
pub async fn get_codebase_data(
&self,
crates: impl IntoIterator<Item = CrateSpec> + Send + 'static,
tracker: &RequestTracker,
) -> impl Iterator<Item = (CrateSpec, ProviderResult<CodebaseData>)> {
let repo_crates = crate_spec::by_repo(crates);
tracker.add_requests(TrackedTopic::Codebase, repo_crates.len() as u64);
let mut cached_results = Vec::new();
let mut needs_repo_fetch: HashMap<RepoSpec, Vec<CrateSpec>> = HashMap::default();
for (repo_spec, crates) in repo_crates {
let mut all_cached_data = Vec::new();
let mut needs_fresh_repo = false;
for crate_spec in &crates {
let crate_name = crate_spec.name();
let filename = Self::get_data_filename(crate_name, &repo_spec);
match self.cache.load::<CodebaseData>(&filename) {
CacheResult::Data(cached_data) => {
all_cached_data.push((crate_spec.clone(), ProviderResult::Found(cached_data)));
}
CacheResult::NoData(reason) => {
all_cached_data.push((crate_spec.clone(), ProviderResult::Unavailable(reason.into())));
}
CacheResult::Miss => {
needs_fresh_repo = true;
break; }
}
}
if needs_fresh_repo {
let _ = needs_repo_fetch.insert(repo_spec, crates);
} else {
cached_results.extend(all_cached_data);
tracker.complete_request(TrackedTopic::Codebase);
}
}
let repo_results = join_all(needs_repo_fetch.into_iter().map(|(repo_spec, crates)| {
let provider = self.clone();
let tracker = tracker.clone();
tokio::spawn(async move {
provider.fetch_and_analyze_repo(repo_spec, crates, tracker).await
})
}))
.await
.into_iter()
.flat_map(|result| result.expect("task must not panic"));
cached_results
.into_iter()
.chain(repo_results)
.inspect(|(crate_spec, result)| {
if let ProviderResult::Error(e) = result {
log::error!(target: LOG_TARGET, "Could not analyze codebase for {crate_spec}: {e:#}");
} else if let ProviderResult::Unavailable(reason) = result {
log::warn!(target: LOG_TARGET, "Codebase data unavailable for {crate_spec}: {reason}");
}
})
}
async fn fetch_and_analyze_repo(
self,
repo_spec: RepoSpec,
crates: Vec<CrateSpec>,
tracker: RequestTracker,
) -> Vec<(CrateSpec, ProviderResult<CodebaseData>)> {
let _permit = self.throttler.acquire().await;
let repo_path = self.get_repo_cache_path(&repo_spec);
match Self::sync_repo(&repo_path, &repo_spec).await {
Err(e) => {
tracker.complete_request(TrackedTopic::Codebase);
let error = Arc::new(e);
return crates
.into_iter()
.map(|crate_spec| (crate_spec, ProviderResult::Error(Arc::clone(&error))))
.collect();
}
Ok(git::RepoStatus::NotFound) => {
let reason = format!("repository '{repo_spec}' not found");
log::debug!(target: LOG_TARGET, "{reason}");
let results: Vec<_> = crates
.into_iter()
.map(|crate_spec| {
let filename = Self::get_data_filename(crate_spec.name(), &repo_spec);
if let Err(e) = self.cache.save_no_data(&filename, &reason) {
log::debug!(target: LOG_TARGET, "Could not save cache for {crate_spec}: {e:#}");
return (crate_spec, ProviderResult::Error(Arc::new(e)));
}
(crate_spec, ProviderResult::Unavailable(reason.clone().into()))
})
.collect();
tracker.complete_request(TrackedTopic::Codebase);
return results;
}
Ok(git::RepoStatus::Ok) => {}
}
let fetch_result = self.fetch_repo_data_core(&repo_spec, &repo_path).await;
let results = match fetch_result {
Ok(repo_data) => {
let repo_data = Arc::new(repo_data);
join_all(crates.into_iter().map(|crate_spec| {
let provider = self.clone();
let repo_spec = repo_spec.clone();
let repo_data = Arc::clone(&repo_data);
tokio::spawn(provider.analyze_crate(crate_spec, repo_spec, repo_data))
}))
.await
.into_iter()
.map(|result| result.expect("task must not panic"))
.collect()
}
Err(e) => {
let reason = format!("{e:#}");
log::warn!(target: LOG_TARGET, "Could not analyze repository '{repo_spec}': {reason}");
crates
.into_iter()
.map(|crate_spec| {
let filename = Self::get_data_filename(crate_spec.name(), &repo_spec);
if !matches!(self.cache.load::<CodebaseData>(&filename), CacheResult::Data(_))
&& let Err(e) = self.cache.save_no_data(&filename, &reason)
{
log::debug!(target: LOG_TARGET, "Could not save cache for {crate_spec}: {e:#}");
}
(crate_spec, ProviderResult::Unavailable(reason.clone().into()))
})
.collect()
}
};
tracker.complete_request(TrackedTopic::Codebase);
results
}
async fn sync_repo(repo_path: &Path, repo_spec: &RepoSpec) -> Result<git::RepoStatus> {
let git_result = tokio::time::timeout(GIT_REPO_TIMEOUT, git::get_repo(repo_path, repo_spec.url())).await;
match git_result {
Err(_) => Err(app_err!(
"git operation timed out after {} seconds for repository '{repo_spec}'",
GIT_REPO_TIMEOUT.as_secs(),
)),
Ok(Err(e)) => Err(e.enrich_with(|| format!("syncing repository '{repo_spec}'"))),
Ok(Ok(status)) => Ok(status),
}
}
async fn fetch_repo_data_core(&self, repo_spec: &RepoSpec, repo_path: &Path) -> Result<RepoData> {
let root_manifest = repo_path.join("Cargo.toml");
if !root_manifest.exists() {
return Err(app_err!("could not find Cargo.toml in root of repository '{repo_spec}'"));
}
log::debug!(target: LOG_TARGET, "Running cargo metadata for repository '{repo_spec}'");
let timeout_result = tokio::time::timeout(
METADATA_TIMEOUT,
spawn_blocking(move || MetadataCommand::new().manifest_path(&root_manifest).exec()),
)
.await;
let metadata = match timeout_result {
Err(_) => {
let timeout_secs = METADATA_TIMEOUT.as_secs();
return Err(app_err!(
"cargo metadata timed out after {timeout_secs} seconds for repository '{repo_spec}' - workspace may be too large or Cargo.toml is corrupted"
));
}
Ok(join_result) => match join_result {
Ok(Ok(metadata)) => metadata,
Ok(Err(e)) => {
return Err(e).into_app_err_with(|| format!("running cargo metadata for repository '{repo_spec}'"));
}
Err(e) => {
return Err(e).into_app_err_with(|| format!("joining cargo metadata task for repository '{repo_spec}'"));
}
},
};
log::debug!(target: LOG_TARGET, "Gathering commit statistics for repository '{repo_spec}'");
let (contributor_count, commit_stats) = tokio::join!(
git::count_contributors(repo_path),
git::get_commit_stats(repo_path, &[90, 180, 365]),
);
let contributor_count = match contributor_count {
Ok(count) => count,
Err(e) => {
log::warn!(target: LOG_TARGET, "Could not count contributors for '{repo_spec}': {e:#}");
0
}
};
let commit_stats = match commit_stats {
Ok(stats) => stats,
Err(e) => {
log::warn!(target: LOG_TARGET, "Could not get commit statistics for '{repo_spec}': {e:#}");
git::CommitStats {
commit_count: 0,
first_commit_at: DateTime::UNIX_EPOCH,
last_commit_at: DateTime::UNIX_EPOCH,
commits_per_window: vec![0, 0, 0],
}
}
};
log::debug!(target: LOG_TARGET, "Detecting workflows in repository '{repo_spec}'");
let repo_path_owned = repo_path.to_path_buf();
let workflows = match spawn_blocking(move || sniff_github_workflows(&repo_path_owned))
.await
.expect("task must not panic")
{
Ok(info) => info,
Err(e) => {
return Err(e).into_app_err_with(|| format!("analyzing GitHub workflows in repository '{repo_spec}'"));
}
};
log::debug!(target: LOG_TARGET, "Analyzed repository '{repo_spec}', found {} packages", metadata.packages.len());
Ok(RepoData {
metadata: Arc::new(metadata),
workflows,
contributor_count,
commits_last_90_days: commit_stats.commits_per_window[0],
commits_last_180_days: commit_stats.commits_per_window[1],
commits_last_365_days: commit_stats.commits_per_window[2],
commit_count: commit_stats.commit_count,
first_commit_at: commit_stats.first_commit_at,
last_commit_at: commit_stats.last_commit_at,
})
}
async fn analyze_crate(
self,
crate_spec: CrateSpec,
repo_spec: RepoSpec,
repo_data: Arc<RepoData>,
) -> (CrateSpec, ProviderResult<CodebaseData>) {
let crate_name = crate_spec.name().to_string();
let filename = Self::get_data_filename(&crate_name, &repo_spec);
log::info!(target: LOG_TARGET, "Analyzing source code for {crate_spec} from repository '{repo_spec}'");
let Some(package) = repo_data.metadata.packages.iter().find(|p| p.name == crate_name) else {
let reason = format!("crate '{crate_name}' not found in repository '{repo_spec}'");
log::debug!(target: LOG_TARGET, "{reason}");
if let Err(e) = self.cache.save_no_data(&filename, &reason) {
log::debug!(target: LOG_TARGET, "Could not save cache for {crate_spec}: {e:#}");
}
return (crate_spec, ProviderResult::Unavailable(reason.into()));
};
let Some(crate_path) = package.manifest_path.parent() else {
let reason = format!("package manifest has no parent directory for {crate_spec}");
if let Err(e) = self.cache.save_no_data(&filename, &reason) {
log::debug!(target: LOG_TARGET, "Could not save cache for {crate_spec}: {e:#}");
}
return (crate_spec, ProviderResult::Unavailable(reason.into()));
};
log::debug!(target: LOG_TARGET, "Found crate at {crate_path}");
let example_count = package.targets.iter().filter(|t| t.kind.contains(&TargetKind::Example)).count();
let transitive_dependencies = Self::count_transitive_dependencies(&package.id, &repo_data.metadata);
let mut codebase_data = CodebaseData {
source_files_analyzed: 0,
production_lines: 0,
test_lines: 0,
comment_lines: 0,
unsafe_count: 0,
source_files_with_errors: 0,
example_count: example_count as u64,
transitive_dependencies: transitive_dependencies as u64,
workflows_detected: repo_data.workflows.workflows_detected,
miri_detected: repo_data.workflows.miri_detected,
clippy_detected: repo_data.workflows.clippy_detected,
contributors: repo_data.contributor_count,
commits_last_90_days: repo_data.commits_last_90_days,
commits_last_180_days: repo_data.commits_last_180_days,
commits_last_365_days: repo_data.commits_last_365_days,
commit_count: repo_data.commit_count,
first_commit_at: repo_data.first_commit_at,
last_commit_at: repo_data.last_commit_at,
};
if let Err(e) = Self::analyze_source_files(crate_path.as_std_path(), &mut codebase_data).await {
let reason = format!("{:#}", e.enrich_with(|| format!("analyzing source files for {crate_spec}")));
if let Err(e) = self.cache.save_no_data(&filename, &reason) {
log::debug!(target: LOG_TARGET, "Could not save cache for {crate_spec}: {e:#}");
}
return (crate_spec, ProviderResult::Unavailable(reason.into()));
}
let result = match self.cache.save(&filename, &codebase_data) {
Ok(()) => ProviderResult::Found(codebase_data),
Err(e) => ProviderResult::Error(Arc::new(e)),
};
log::debug!(target: LOG_TARGET, "Completed analysis of {crate_spec}");
(crate_spec, result)
}
async fn analyze_source_files(crate_path: &Path, codebase_data: &mut CodebaseData) -> Result<()> {
const MAX_FILES: usize = 10_000;
const MAX_FILE_SIZE: u64 = 5_000_000; const MAX_DEPTH: usize = 50;
let src_dir = crate_path.join("src");
if !src_dir.exists() {
return Ok(());
}
let file_paths: Vec<_> = spawn_blocking(move || {
walkdir::WalkDir::new(&src_dir)
.follow_links(false) .max_depth(MAX_DEPTH)
.into_iter()
.filter_map(|e| match e {
Ok(entry) => Some(entry),
Err(err) => {
log::debug!(target: LOG_TARGET, "Could not walk directory: {err:#}");
None
}
})
.filter(|e| !e.file_type().is_dir())
.filter(|e| e.path().extension().and_then(|s| s.to_str()) == Some("rs"))
.take(MAX_FILES)
.filter_map(|entry| {
let metadata = match entry.metadata() {
Ok(m) => m,
Err(e) => {
log::debug!(target: LOG_TARGET, "Could not read metadata for {}: {e:#}", entry.path().display());
return None;
}
};
if metadata.len() > MAX_FILE_SIZE {
log::debug!(target: LOG_TARGET, "Skipping large file '{}' ({} bytes)", entry.path().display(), metadata.len());
return None;
}
Some(entry.path().to_path_buf())
})
.collect()
})
.await
.expect("task must not panic");
if file_paths.is_empty() {
return Ok(());
}
if file_paths.len() == MAX_FILES {
log::debug!(
target: LOG_TARGET,
"File count limit ({MAX_FILES}) reached in {}, some files may not be analyzed",
crate_path.join("src").display()
);
}
log::debug!(target: LOG_TARGET, "Analyzing {} source files", file_paths.len());
let num_workers = std::thread::available_parallelism().map(core::num::NonZero::get).unwrap_or(4);
let semaphore = Arc::new(Semaphore::new(num_workers));
let mut analysis_tasks: Vec<JoinHandle<Result<_, ohno::AppError>>> = Vec::with_capacity(file_paths.len());
for path in file_paths {
let permit_res = Arc::clone(&semaphore).acquire_owned().await;
let task = spawn_blocking(move || {
let _permit = permit_res.expect("Semaphore closed");
let content = fs::read_to_string(&path).into_app_err_with(|| format!("reading source file '{}'", path.display()))?;
Ok(source_file_analyzer::analyze_source_file(&content))
});
analysis_tasks.push(task);
}
let results = join_all(analysis_tasks).await;
for task_result in results {
match task_result.expect("tasks must not panic") {
Ok(file_stats) => {
codebase_data.source_files_analyzed += 1;
codebase_data.production_lines += file_stats.production_lines;
codebase_data.test_lines += file_stats.test_lines;
codebase_data.comment_lines += file_stats.comment_lines;
codebase_data.unsafe_count += file_stats.unsafe_count;
if file_stats.has_errors {
codebase_data.source_files_with_errors += 1;
}
}
Err(e) => {
log::debug!(target: LOG_TARGET, "Could not read source file, skipping: {e:#}");
}
}
}
Ok(())
}
fn safe_repo_components(repo_spec: &RepoSpec) -> (String, String, String) {
(
sanitize_path_component(repo_spec.host()),
sanitize_path_component(repo_spec.owner()),
sanitize_path_component(repo_spec.repo()),
)
}
fn get_repo_cache_path(&self, repo_spec: &RepoSpec) -> PathBuf {
let (safe_host, safe_owner, safe_repo) = Self::safe_repo_components(repo_spec);
self.cache.dir().join("repos").join(safe_host).join(safe_owner).join(safe_repo)
}
fn get_data_filename(crate_name: &str, repo_spec: &RepoSpec) -> String {
let (safe_host, safe_owner, safe_repo) = Self::safe_repo_components(repo_spec);
let safe_crate = sanitize_path_component(crate_name);
format!("analysis/{safe_host}/{safe_owner}/{safe_repo}/{safe_crate}.json")
}
fn count_transitive_dependencies(package_id: &PackageId, metadata: &Metadata) -> usize {
use crate::HashSet;
use std::collections::VecDeque;
let Some(resolve) = &metadata.resolve else {
log::debug!(target: LOG_TARGET, "No resolve graph in metadata, cannot count transitive dependencies");
return 0;
};
let node_map: HashMap<&PackageId, &cargo_metadata::Node> = resolve.nodes.iter().map(|n| (&n.id, n)).collect();
let Some(node) = node_map.get(package_id) else {
log::debug!(target: LOG_TARGET, "Could not find package '{package_id}' in resolve graph, cannot count transitive dependencies");
return 0;
};
let mut visited: HashSet<&PackageId> = HashSet::default();
let mut to_visit: VecDeque<&PackageId> = VecDeque::new();
for dep_id in &node.dependencies {
to_visit.push_back(dep_id);
}
while let Some(dep_id) = to_visit.pop_front() {
if visited.insert(dep_id)
&& let Some(dep_node) = node_map.get(dep_id)
{
for transitive_dep_id in &dep_node.dependencies {
to_visit.push_back(transitive_dep_id);
}
}
}
visited.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::facts::RepoSpec;
#[test]
fn test_safe_repo_components() {
let url = url::Url::parse("https://github.com/tokio-rs/tokio").unwrap();
let repo_spec = RepoSpec::parse(&url).unwrap();
let (host, owner, repo) = Provider::safe_repo_components(&repo_spec);
assert_eq!(host, "github.com");
assert_eq!(owner, "tokio-rs");
assert_eq!(repo, "tokio");
}
#[test]
fn test_safe_repo_components_sanitized() {
let url = url::Url::parse("https://evil.com/../../etc/passwd").unwrap();
let repo_spec = RepoSpec::parse(&url).unwrap();
let (host, owner, repo) = Provider::safe_repo_components(&repo_spec);
assert!(!host.contains(".."));
assert!(!owner.contains(".."));
assert!(!repo.contains(".."));
}
#[test]
fn test_get_data_filename() {
let url = url::Url::parse("https://github.com/tokio-rs/tokio").unwrap();
let repo_spec = RepoSpec::parse(&url).unwrap();
let filename = Provider::get_data_filename("tokio", &repo_spec);
assert!(filename.starts_with("analysis/"));
assert!(filename.contains("github.com"));
assert!(filename.contains("tokio-rs"));
assert!(filename.contains("tokio"));
assert!(Path::new(&filename).extension().is_some_and(|ext| ext.eq_ignore_ascii_case("json")));
}
#[test]
fn test_get_repo_cache_path() {
let cache = Cache::new(
"/tmp/cache",
Duration::from_secs(3600),
false,
);
let provider = Provider::new(cache);
let url = url::Url::parse("https://github.com/tokio-rs/tokio").unwrap();
let repo_spec = RepoSpec::parse(&url).unwrap();
let path = provider.get_repo_cache_path(&repo_spec);
let path_str = path.to_string_lossy();
assert!(path_str.contains("repos"));
assert!(path_str.contains("github.com"));
assert!(path_str.contains("tokio-rs"));
assert!(path_str.contains("tokio"));
}
#[test]
fn test_get_data_filename_sanitized() {
let url = url::Url::parse("https://evil.com/../../etc/passwd").unwrap();
let repo_spec = RepoSpec::parse(&url).unwrap();
let filename = Provider::get_data_filename("../malicious", &repo_spec);
assert!(!filename.contains("../"));
}
}