use std::path::Path;
use std::sync::Arc;
use crossbeam_channel::bounded;
use ignore::{DirEntry, WalkBuilder, WalkState};
use crate::analyzer::stats::FileStats;
use crate::analyzer::FileAnalyzer;
use crate::error::Result;
use crate::filter::Filter;
#[derive(Debug, Clone)]
pub struct WalkerConfig {
pub threads: usize,
pub follow_symlinks: bool,
pub use_gitignore: bool,
pub max_depth: Option<usize>,
pub custom_ignores: Vec<String>,
}
impl Default for WalkerConfig {
fn default() -> Self {
Self {
threads: num_cpus::get(),
follow_symlinks: false,
use_gitignore: true,
max_depth: None,
custom_ignores: Vec::new(),
}
}
}
pub struct ParallelWalker {
config: WalkerConfig,
}
impl ParallelWalker {
pub fn new(config: WalkerConfig) -> Self {
Self { config }
}
pub fn walk_and_analyze<F, S>(
&self,
root: &Path,
analyzer: Arc<FileAnalyzer>,
filter: Arc<dyn Filter>,
mut on_file: F,
mut on_skip: S,
) -> Result<()>
where
F: FnMut(FileStats) + Send,
S: FnMut(&Path) + Send,
{
let (tx, rx) = bounded::<WalkResult>(1000);
let mut builder = WalkBuilder::new(root);
builder
.hidden(false) .git_ignore(self.config.use_gitignore)
.git_global(self.config.use_gitignore)
.git_exclude(self.config.use_gitignore)
.follow_links(self.config.follow_symlinks)
.threads(self.config.threads);
if let Some(depth) = self.config.max_depth {
builder.max_depth(Some(depth));
}
for pattern in &self.config.custom_ignores {
builder.add_custom_ignore_filename(pattern);
}
let filter_clone = Arc::clone(&filter);
let analyzer_clone = Arc::clone(&analyzer);
std::thread::scope(|s| {
let consumer = s.spawn(|| {
for result in &rx {
match result {
WalkResult::File(stats) => on_file(stats),
WalkResult::Skipped(path) => on_skip(&path),
}
}
});
builder.build_parallel().run(|| {
let tx = tx.clone();
let filter = Arc::clone(&filter_clone);
let analyzer = Arc::clone(&analyzer_clone);
let mut buf: Vec<u8> = Vec::with_capacity(64 * 1024);
Box::new(move |entry: std::result::Result<DirEntry, ignore::Error>| {
let entry = match entry {
Ok(e) => e,
Err(_) => return WalkState::Continue,
};
let path = entry.path();
let is_dir = entry.file_type().map(|t| t.is_dir()).unwrap_or(false);
if !filter.should_include(path, is_dir) {
if is_dir {
return WalkState::Skip;
}
let _ = tx.send(WalkResult::Skipped(path.to_path_buf()));
return WalkState::Continue;
}
if is_dir {
return WalkState::Continue;
}
if !entry.file_type().map(|t| t.is_file()).unwrap_or(false) {
return WalkState::Continue;
}
buf.clear();
match std::fs::File::open(path).and_then(|mut f| {
if let Ok(meta) = f.metadata() {
buf.reserve(meta.len() as usize);
}
std::io::Read::read_to_end(&mut f, &mut buf)
}) {
Ok(_) => match analyzer.analyze_from_bytes(path, &buf) {
Ok(Some(stats)) => {
let _ = tx.send(WalkResult::File(stats));
}
Ok(None) => {
let _ = tx.send(WalkResult::Skipped(path.to_path_buf()));
}
Err(_) => {
let _ = tx.send(WalkResult::Skipped(path.to_path_buf()));
}
},
Err(_) => {
let _ = tx.send(WalkResult::Skipped(path.to_path_buf()));
}
}
if buf.capacity() > 1_048_576 {
buf = Vec::with_capacity(64 * 1024);
}
WalkState::Continue
})
});
drop(tx);
consumer.join().expect("consumer thread panicked");
});
Ok(())
}
}
impl Default for ParallelWalker {
fn default() -> Self {
Self::new(WalkerConfig::default())
}
}
enum WalkResult {
File(FileStats),
Skipped(std::path::PathBuf),
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use tempfile::TempDir;
struct AllowAll;
impl Filter for AllowAll {
fn should_include(&self, _path: &Path, _is_dir: bool) -> bool {
true
}
}
#[test]
fn test_walker_config_default() {
let config = WalkerConfig::default();
assert!(config.threads > 0);
assert!(config.use_gitignore);
assert!(!config.follow_symlinks);
}
#[test]
fn test_walk_empty_dir() {
let dir = TempDir::new().unwrap();
let walker = ParallelWalker::default();
let registry = Arc::new(crate::language::LanguageRegistry::empty());
let analyzer = Arc::new(FileAnalyzer::new(
registry,
&crate::config::Config::default(),
));
let filter = Arc::new(AllowAll);
let count = AtomicUsize::new(0);
walker
.walk_and_analyze(
dir.path(),
analyzer,
filter,
|_| {
count.fetch_add(1, Ordering::SeqCst);
},
|_| {},
)
.unwrap();
assert_eq!(count.load(Ordering::SeqCst), 0);
}
#[test]
fn test_walk_many_files_no_deadlock() {
let dir = TempDir::new().unwrap();
let file_count = 1500;
for i in 0..file_count {
std::fs::write(dir.path().join(format!("file_{i}.rs")), "fn main() {}\n").unwrap();
}
let walker = ParallelWalker::default();
let registry = Arc::new(crate::language::LanguageRegistry::with_builtin().unwrap());
let analyzer = Arc::new(FileAnalyzer::new(
registry,
&crate::config::Config::default(),
));
let filter = Arc::new(AllowAll);
let analyzed = AtomicUsize::new(0);
walker
.walk_and_analyze(
dir.path(),
analyzer,
filter,
|_| {
analyzed.fetch_add(1, Ordering::SeqCst);
},
|_| {},
)
.unwrap();
assert_eq!(analyzed.load(Ordering::SeqCst), file_count);
}
}