use std::path::PathBuf;
use std::sync::Arc;
use anyhow::Result;
use crossbeam::channel::{Receiver, Sender, bounded};
pub fn run<F>(work_items: Vec<PathBuf>, processor: F, num_workers: usize) -> Result<()>
where
F: Fn(&PathBuf) + Send + Sync + 'static,
{
if work_items.is_empty() {
return Ok(());
}
let actual_workers = std::cmp::min(num_workers, work_items.len());
let buffer_multiplier = crate::config::CONFIG.scanner.channel_buffer_multiplier as usize;
let buffer_size = (actual_workers * buffer_multiplier).max(64);
let (work_tx, work_rx): (Sender<PathBuf>, Receiver<PathBuf>) = bounded(buffer_size);
let processor = Arc::new(processor);
let stack_size_bytes = crate::config::CONFIG.scanner.stack_size_mb as usize * 1024 * 1024;
tracing::debug!(
"Starting {} workers with {}MB stack size, channel buffer: {}",
actual_workers,
crate::config::CONFIG.scanner.stack_size_mb,
buffer_size
);
crossbeam::thread::scope(|s| -> Result<()> {
for worker_id in 0..actual_workers {
let work_rx = work_rx.clone();
let processor = processor.clone();
s.builder()
.name(format!("scanner-{worker_id:02}"))
.stack_size(stack_size_bytes)
.spawn(move |_| {
tracing::debug!("Worker {} started", worker_id);
let mut file_count = 0;
while let Ok(work_item) = work_rx.recv() {
file_count += 1;
if file_count % 100 == 0
&& let Ok(status) = std::fs::read_to_string("/proc/self/status")
&& let Some(line) = status.lines().find(|l| l.starts_with("VmRSS:"))
{
tracing::debug!(
"Worker {} after {} files: {}",
worker_id,
file_count,
line
);
}
processor(&work_item);
}
tracing::debug!("Worker {} finished after {} files", worker_id, file_count);
})
.map_err(|e| {
anyhow::anyhow!("Failed to spawn worker thread {}: {}", worker_id, e)
})?;
}
s.builder()
.name("guardy-producer".to_string())
.stack_size(stack_size_bytes)
.spawn(move |_| {
for item in work_items {
if work_tx.send(item).is_err() {
break; }
}
})
.map_err(|e| anyhow::anyhow!("Failed to spawn producer thread: {}", e))?;
Ok(())
})
.map_err(|_| anyhow::anyhow!("Worker thread panicked"))?
}